package cz.o2.proxima.client;

import cz.o2.proxima.proto.service.IngestServiceGrpc;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc;
import cz.o2.proxima.proto.service.Rpc;
import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shadow.com.google.common.base.Strings;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/client/IngestClient.class */
public class IngestClient implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(IngestClient.class);
    private final String host;
    private final int port;
    private final Options options;
    private final Thread flushThread;
    StreamObserver<Rpc.IngestBulk> requestObserver;
    private final Map<String, Request> inflightRequests = Collections.synchronizedMap(new HashMap());
    Channel channel = null;
    IngestServiceGrpc.IngestServiceStub stub = null;
    RetrieveServiceGrpc.RetrieveServiceBlockingStub getStub = null;
    final Rpc.IngestBulk.Builder bulkBuilder = Rpc.IngestBulk.newBuilder();
    final CountDownLatch closedLatch = new CountDownLatch(1);
    final StreamObserver<Rpc.StatusBulk> statusObserver = new StreamObserver<Rpc.StatusBulk>() { // from class: cz.o2.proxima.client.IngestClient.1
        public void onNext(Rpc.StatusBulk statusBulk) {
            for (Rpc.Status status : statusBulk.getStatusList()) {
                Request request = (Request) IngestClient.this.inflightRequests.remove(status.getUuid());
                if (request == null) {
                    IngestClient.log.warn("Received response for unknown message " + status);
                } else {
                    synchronized (IngestClient.this.inflightRequests) {
                        IngestClient.this.inflightRequests.notifyAll();
                    }
                    request.setStatus(status);
                }
            }
        }

        public void onError(Throwable th) {
            IngestClient.log.warn("Error on channel, closing stub", th);
            synchronized (IngestClient.this) {
                IngestClient.this.stub = null;
                IngestClient.this.createChannelAndStub();
            }
        }

        public void onCompleted() {
            synchronized (IngestClient.this.inflightRequests) {
                IngestClient.this.inflightRequests.clear();
            }
            IngestClient.this.closedLatch.countDown();
        }
    };
    private final ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/client/IngestClient$Request.class */
    public class Request {
        final Consumer<Rpc.Status> consumer;
        final ScheduledFuture timeoutFuture;
        final Rpc.Ingest payload;

        Request(Consumer<Rpc.Status> consumer, ScheduledFuture scheduledFuture, Rpc.Ingest ingest) {
            this.consumer = consumer;
            this.timeoutFuture = scheduledFuture;
            this.payload = ingest;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setStatus(Rpc.Status status) {
            if (this.timeoutFuture == null || this.timeoutFuture.cancel(false)) {
                this.consumer.accept(status);
            }
        }

        void retry() {
            IngestClient.this.sendTry(this.payload, -1L, TimeUnit.MILLISECONDS, this.consumer, true);
        }

        public Consumer<Rpc.Status> getConsumer() {
            return this.consumer;
        }

        public ScheduledFuture getTimeoutFuture() {
            return this.timeoutFuture;
        }

        public Rpc.Ingest getPayload() {
            return this.payload;
        }
    }

    public static IngestClient create(String str, int i) {
        return create(str, i, new Options());
    }

    public static IngestClient create(String str, int i, Options options) {
        return new IngestClient(str, i, options);
    }

    @VisibleForTesting
    IngestClient(String str, int i, Options options) {
        this.host = str;
        this.port = i;
        this.options = options;
        this.flushThread = new Thread(() -> {
            long flushUsec = options.getFlushUsec() * 1000;
            long nanoTime = System.nanoTime();
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    long nanoTime2 = System.nanoTime();
                    long j = (flushUsec - nanoTime2) + nanoTime;
                    synchronized (this) {
                        if (j > 0) {
                            wait(j / 1000000, (int) (j % 1000000));
                        }
                    }
                    synchronized (this) {
                        if (this.bulkBuilder.getIngestCount() > 0) {
                            flush();
                        }
                        nanoTime = nanoTime2;
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        this.flushThread.setDaemon(true);
        this.flushThread.setName(getClass().getSimpleName() + "-flushThread");
    }

    public void send(Rpc.Ingest ingest, Consumer<Rpc.Status> consumer) {
        send(ingest, -1L, TimeUnit.SECONDS, consumer);
    }

    public void send(Rpc.Ingest ingest, long j, TimeUnit timeUnit, Consumer<Rpc.Status> consumer) {
        sendTry(ingest, j, timeUnit, consumer, false);
    }

    public Rpc.GetResponse get(Rpc.GetRequest getRequest) {
        ensureChannel();
        return this.getStub.get(getRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTry(Rpc.Ingest ingest, long j, TimeUnit timeUnit, Consumer<Rpc.Status> consumer, boolean z) {
        if (Strings.isNullOrEmpty(ingest.getUuid())) {
            throw new IllegalArgumentException("UUID cannot be null, because it is used to confirm messages.");
        }
        synchronized (this) {
            if (!this.flushThread.isAlive()) {
                this.flushThread.start();
            }
            ensureChannel();
        }
        while (!z && this.inflightRequests.size() >= this.options.getMaxInflightRequests()) {
            synchronized (this.inflightRequests) {
                try {
                    this.inflightRequests.wait(100L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    consumer.accept(Rpc.Status.newBuilder().setStatus(417).setStatusMessage("Interrupted while waiting for the requests to settle").build());
                    return;
                }
            }
        }
        ScheduledFuture<?> scheduledFuture = null;
        if (j > 0) {
            scheduledFuture = this.timer.schedule(() -> {
                this.inflightRequests.remove(ingest.getUuid());
                consumer.accept(Rpc.Status.newBuilder().setStatus(504).setStatusMessage("Timeout while waiting for response of request UUID " + ingest.getUuid()).build());
            }, j, timeUnit);
        }
        this.inflightRequests.putIfAbsent(ingest.getUuid(), new Request(consumer, scheduledFuture, ingest));
        synchronized (this) {
            this.bulkBuilder.addIngest(ingest);
            if (this.bulkBuilder.getIngestCount() >= this.options.getMaxFlushRecords()) {
                synchronized (this.flushThread) {
                    this.flushThread.notify();
                }
            }
        }
    }

    @VisibleForTesting
    void createChannelAndStub() {
        if (this.channel == null) {
            this.channel = ManagedChannelBuilder.forAddress(this.host, this.port).usePlaintext(true).executor(this.options.getExecutor()).build();
        }
        this.getStub = RetrieveServiceGrpc.newBlockingStub(this.channel);
        this.stub = IngestServiceGrpc.newStub(this.channel);
        this.requestObserver = this.stub.ingestBulk(this.statusObserver);
        synchronized (this.inflightRequests) {
            this.inflightRequests.values().forEach((v0) -> {
                v0.retry();
            });
        }
    }

    private void ensureChannel() {
        if (this.channel == null) {
            createChannelAndStub();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        boolean z;
        synchronized (this) {
            flush();
            z = this.channel != null;
        }
        if (z) {
            while (!this.inflightRequests.isEmpty()) {
                synchronized (this.inflightRequests) {
                    try {
                        this.inflightRequests.wait(100L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            synchronized (this) {
                this.requestObserver.onCompleted();
            }
            this.flushThread.interrupt();
            try {
                this.closedLatch.await(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            this.channel = null;
        }
    }

    private void flush() {
        if (this.requestObserver != null) {
            this.requestObserver.onNext(this.bulkBuilder.build());
        }
        this.bulkBuilder.clear();
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public Options getOptions() {
        return this.options;
    }
}
