package software.amazon.awssdk.http.crt.internal;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongUnaryOperator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

@SdkInternalApi
/* loaded from: input_file:software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.class */
public final class AwsCrtResponseBodyPublisher implements Publisher<ByteBuffer> {
    private static final Logger log = Logger.loggerFor(AwsCrtResponseBodyPublisher.class);
    private static final LongUnaryOperator DECREMENT_IF_GREATER_THAN_ZERO = j -> {
        return j > 0 ? j - 1 : j;
    };
    private final HttpClientConnection connection;
    private final HttpStream stream;
    private final CompletableFuture<Void> responseComplete;
    private final int windowSize;
    private final AtomicLong outstandingRequests = new AtomicLong(0);
    private final AtomicBoolean isCancelled = new AtomicBoolean(false);
    private final AtomicBoolean areNativeResourcesReleased = new AtomicBoolean(false);
    private final AtomicBoolean isSubscriptionComplete = new AtomicBoolean(false);
    private final AtomicBoolean queueComplete = new AtomicBoolean(false);
    private final AtomicInteger mutualRecursionDepth = new AtomicInteger(0);
    private final AtomicInteger queuedBytes = new AtomicInteger(0);
    private final AtomicReference<Subscriber<? super ByteBuffer>> subscriberRef = new AtomicReference<>(null);
    private final Queue<byte[]> queuedBuffers = new ConcurrentLinkedQueue();
    private final AtomicReference<Throwable> error = new AtomicReference<>(null);

    /* loaded from: input_file:software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher$AwsCrtResponseBodySubscription.class */
    static class AwsCrtResponseBodySubscription implements Subscription {
        private final AwsCrtResponseBodyPublisher publisher;

        AwsCrtResponseBodySubscription(AwsCrtResponseBodyPublisher awsCrtResponseBodyPublisher) {
            this.publisher = awsCrtResponseBodyPublisher;
        }

        public void request(long j) {
            if (j <= 0) {
                this.publisher.setError(new IllegalArgumentException("Request is for <= 0 elements: " + j));
                this.publisher.publishToSubscribers();
            } else {
                this.publisher.request(j);
                this.publisher.publishToSubscribers();
            }
        }

        public void cancel() {
            this.publisher.setCancelled();
        }
    }

    public AwsCrtResponseBodyPublisher(HttpClientConnection httpClientConnection, HttpStream httpStream, CompletableFuture<Void> completableFuture, int i) {
        this.connection = (HttpClientConnection) Validate.notNull(httpClientConnection, "HttpConnection must not be null", new Object[0]);
        this.stream = (HttpStream) Validate.notNull(httpStream, "Stream must not be null", new Object[0]);
        this.responseComplete = (CompletableFuture) Validate.notNull(completableFuture, "ResponseComplete future must not be null", new Object[0]);
        this.windowSize = Validate.isPositive(i, "windowSize must be > 0");
    }

    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        Validate.notNull(subscriber, "Subscriber must not be null", new Object[0]);
        if (this.subscriberRef.compareAndSet(null, subscriber)) {
            subscriber.onSubscribe(new AwsCrtResponseBodySubscription(this));
            return;
        }
        log.error(() -> {
            return "Only one subscriber allowed";
        });
        subscriber.onSubscribe(new Subscription() { // from class: software.amazon.awssdk.http.crt.internal.AwsCrtResponseBodyPublisher.1
            public void request(long j) {
            }

            public void cancel() {
            }
        });
        subscriber.onError(new IllegalStateException("Only one subscriber allowed"));
    }

    public void queueBuffer(byte[] bArr) {
        Validate.notNull(bArr, "ByteBuffer must not be null", new Object[0]);
        if (this.isCancelled.get()) {
            this.stream.incrementWindow(bArr.length);
            return;
        }
        this.queuedBuffers.add(bArr);
        int addAndGet = this.queuedBytes.addAndGet(bArr.length);
        if (addAndGet > this.windowSize) {
            throw new IllegalStateException("Queued more than Window Size: queued=" + addAndGet + ", window=" + this.windowSize);
        }
    }

    protected void request(long j) {
        long addAndGet;
        Validate.inclusiveBetween(1L, Long.MAX_VALUE, j, "request");
        if (j > Long.MAX_VALUE - this.outstandingRequests.get()) {
            this.outstandingRequests.set(Long.MAX_VALUE);
            addAndGet = Long.MAX_VALUE;
        } else {
            addAndGet = this.outstandingRequests.addAndGet(j);
        }
        publishToSubscribers();
        long j2 = addAndGet;
        log.trace(() -> {
            return "Subscriber Requested more Buffers. Outstanding Requests: " + j2;
        });
    }

    public void setError(Throwable th) {
        log.error(() -> {
            return "Error processing Response Body";
        }, th);
        this.error.compareAndSet(null, th);
    }

    protected void setCancelled() {
        this.isCancelled.set(true);
        this.subscriberRef.set(null);
    }

    private synchronized void releaseNativeResources() {
        if (this.areNativeResourcesReleased.getAndSet(true)) {
            return;
        }
        this.stream.close();
        this.connection.close();
    }

    public void setQueueComplete() {
        log.trace(() -> {
            return "Response Body Publisher queue marked as completed.";
        });
        this.queueComplete.set(true);
        releaseNativeResources();
    }

    protected void completeSubscriptionExactlyOnce() {
        if (this.isSubscriptionComplete.getAndSet(true)) {
            return;
        }
        Optional ofNullable = Optional.ofNullable(this.subscriberRef.getAndSet(null));
        Throwable th = this.error.get();
        releaseNativeResources();
        if (th != null) {
            log.error(() -> {
                return "Error before ResponseBodyPublisher could complete: " + th.getMessage();
            });
            try {
                ofNullable.ifPresent(subscriber -> {
                    subscriber.onError(th);
                });
            } catch (Exception e) {
                log.warn(() -> {
                    return "Failed to exceptionally complete subscriber future with: " + th.getMessage();
                });
            }
            this.responseComplete.completeExceptionally(th);
            return;
        }
        log.debug(() -> {
            return "ResponseBodyPublisher Completed Successfully";
        });
        try {
            ofNullable.ifPresent((v0) -> {
                v0.onComplete();
            });
        } catch (Exception e2) {
            log.warn(() -> {
                return "Failed to successfully complete subscriber future";
            });
        }
        this.responseComplete.complete(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publishToSubscribers() {
        boolean z;
        synchronized (this) {
            if (this.error.get() != null) {
                z = true;
            } else {
                if (this.isSubscriptionComplete.get() || this.isCancelled.get()) {
                    log.debug(() -> {
                        return "Subscription already completed or cancelled, can't publish updates to Subscribers.";
                    });
                    return;
                }
                if (this.mutualRecursionDepth.get() > 0) {
                    return;
                }
                int i = 0;
                while (this.outstandingRequests.get() > 0 && !this.queuedBuffers.isEmpty()) {
                    byte[] poll = this.queuedBuffers.poll();
                    this.outstandingRequests.getAndUpdate(DECREMENT_IF_GREATER_THAN_ZERO);
                    int length = poll.length;
                    publishWithoutMutualRecursion(this.subscriberRef.get(), ByteBuffer.wrap(poll));
                    i += length;
                }
                if (i > 0) {
                    this.queuedBytes.addAndGet(-i);
                    if (!this.areNativeResourcesReleased.get()) {
                        this.stream.incrementWindow(i);
                    }
                }
                z = this.queueComplete.get() && this.queuedBuffers.isEmpty();
            }
            if (z) {
                completeSubscriptionExactlyOnce();
            }
        }
    }

    private synchronized void publishWithoutMutualRecursion(Subscriber<? super ByteBuffer> subscriber, ByteBuffer byteBuffer) {
        try {
            if (this.mutualRecursionDepth.getAndIncrement() == 0) {
                subscriber.onNext(byteBuffer);
            }
        } finally {
            this.mutualRecursionDepth.decrementAndGet();
        }
    }
}
