package cz.o2.proxima.gcloud.storage;

import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.gcloud.storage.BinaryBlob;
import cz.o2.proxima.gcloud.storage.shaded.com.google.cloud.WriteChannel;
import cz.o2.proxima.gcloud.storage.shaded.com.google.cloud.storage.Blob;
import cz.o2.proxima.gcloud.storage.shaded.com.google.cloud.storage.Storage;
import cz.o2.proxima.gcloud.storage.shaded.com.google.cloud.storage.StorageClass;
import cz.o2.proxima.gcloud.storage.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.BulkAttributeWriter;
import cz.o2.proxima.storage.CommitCallback;
import cz.o2.proxima.storage.StreamElement;
import cz.seznam.euphoria.core.util.ExceptionUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.InetAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.commons.codec.binary.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/gcloud/storage/BulkGCloudStorageWriter.class */
public class BulkGCloudStorageWriter extends GCloudClient implements BulkAttributeWriter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BulkGCloudStorageWriter.class);
    private static final DateTimeFormatter DIR_FORMAT = DateTimeFormatter.ofPattern("yyyy/MM/");

    @VisibleForTesting
    static final String PREFIX;
    private final Factory<Executor> executorFactory;
    private final File tmpDir;
    private final long rollPeriod;
    private final boolean gzip;
    private final int bufferSize;
    private final long allowedLateness;
    private final long flushAttemptDelay;

    @SuppressFBWarnings(value = {"SE_BAD_FIELD"}, justification = "Serialized empty. After first write the writer is not considered serializable anymore.")
    private final NavigableMap<Long, BucketData> buckets;
    private long maxSeenTimestamp;
    private long lastFlushAttempt;
    private long writeSeqNo;
    private transient Executor flushExecutor;
    private transient boolean initialized;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/gcloud/storage/BulkGCloudStorageWriter$BucketData.class */
    public class BucketData {
        final BinaryBlob blob;
        final BinaryBlob.Writer writer;

        @Nullable
        CommitCallback committer = null;
        long lastWriteWatermark = 0;
        long lastWriteSeqNo = 0;

        BucketData() {
            try {
                this.blob = BulkGCloudStorageWriter.this.createLocalBlob();
                this.writer = this.blob.writer(BulkGCloudStorageWriter.this.gzip);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public String toString() {
            return "BulkGCloudStorageWriter.BucketData(blob=" + getBlob() + ", writer=" + getWriter() + ", committer=" + getCommitter() + ", lastWriteWatermark=" + getLastWriteWatermark() + ", lastWriteSeqNo=" + getLastWriteSeqNo() + ")";
        }

        public BinaryBlob getBlob() {
            return this.blob;
        }

        public BinaryBlob.Writer getWriter() {
            return this.writer;
        }

        @Nullable
        public CommitCallback getCommitter() {
            return this.committer;
        }

        public void setCommitter(@Nullable CommitCallback commitCallback) {
            this.committer = commitCallback;
        }

        public long getLastWriteWatermark() {
            return this.lastWriteWatermark;
        }

        public void setLastWriteWatermark(long j) {
            this.lastWriteWatermark = j;
        }

        public long getLastWriteSeqNo() {
            return this.lastWriteSeqNo;
        }

        public void setLastWriteSeqNo(long j) {
            this.lastWriteSeqNo = j;
        }
    }

    public BulkGCloudStorageWriter(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map, Context context) {
        super(entityDescriptor, uri, map);
        this.buckets = new TreeMap();
        this.maxSeenTimestamp = Long.MIN_VALUE;
        this.lastFlushAttempt = Long.MIN_VALUE;
        this.writeSeqNo = 0L;
        this.tmpDir = (File) Optional.ofNullable(map.get("tmp.dir")).map((v0) -> {
            return v0.toString();
        }).map(File::new).orElse(new File("/tmp/bulk-cloud-storage-" + UUID.randomUUID()));
        this.rollPeriod = ((Long) Optional.ofNullable(map.get("log-roll-interval")).map((v0) -> {
            return v0.toString();
        }).map(Long::valueOf).orElse(3600000L)).longValue();
        this.gzip = ((Boolean) Optional.ofNullable(map.get("gzip")).map((v0) -> {
            return v0.toString();
        }).map(Boolean::valueOf).orElse(false)).booleanValue();
        this.bufferSize = ((Integer) Optional.ofNullable(map.get("buffer-size")).map((v0) -> {
            return v0.toString();
        }).map(Integer::valueOf).orElse(1048576)).intValue();
        this.allowedLateness = ((Long) Optional.ofNullable(map.get("allowed-lateness-ms")).map((v0) -> {
            return v0.toString();
        }).map(Long::valueOf).orElse(300000L)).longValue();
        this.flushAttemptDelay = ((Long) Optional.ofNullable(map.get("flush-delay-ms")).map((v0) -> {
            return v0.toString();
        }).map(Long::valueOf).orElse(5000L)).longValue();
        context.getClass();
        this.executorFactory = context::getExecutorService;
    }

    public void write(StreamElement streamElement, CommitCallback commitCallback) {
        try {
            init();
            long stamp = streamElement.getStamp();
            BucketData orCreateWriterFor = getOrCreateWriterFor(stamp);
            orCreateWriterFor.setCommitter(commitCallback);
            orCreateWriterFor.getWriter().write(streamElement);
            long j = this.writeSeqNo;
            this.writeSeqNo = j + 1;
            orCreateWriterFor.setLastWriteSeqNo(j);
            boolean z = false;
            if (this.maxSeenTimestamp < stamp) {
                z = true;
                this.maxSeenTimestamp = stamp;
            }
            orCreateWriterFor.setLastWriteWatermark(this.maxSeenTimestamp);
            if (z && (this.lastFlushAttempt == Long.MIN_VALUE || stamp - this.lastFlushAttempt >= this.flushAttemptDelay)) {
                flushWriters(this.maxSeenTimestamp - this.allowedLateness);
                this.lastFlushAttempt = stamp;
            }
        } catch (Exception e) {
            log.warn("Exception writing data {}", streamElement, e);
            commitCallback.commit(false, e);
        }
    }

    private BucketData getOrCreateWriterFor(long j) {
        return (BucketData) this.buckets.computeIfAbsent(Long.valueOf(getFlushBoundary(j) + this.rollPeriod), l -> {
            return new BucketData();
        });
    }

    private long getFlushBoundary(long j) {
        return (j / this.rollPeriod) * this.rollPeriod;
    }

    private void flushWriters(long j) {
        ArrayList arrayList = new ArrayList();
        long j2 = -1;
        CommitCallback commitCallback = null;
        for (Map.Entry<Long, BucketData> entry : this.buckets.entrySet()) {
            if (entry.getKey().longValue() > j) {
                break;
            }
            arrayList.add(entry);
            if (entry.getValue().getLastWriteWatermark() >= entry.getKey().longValue()) {
                j = entry.getKey().longValue() + this.rollPeriod;
            }
            if (entry.getValue().getLastWriteSeqNo() > j2) {
                j2 = entry.getValue().getLastWriteSeqNo();
                commitCallback = entry.getValue().getCommitter();
            }
        }
        CommitCallback commitCallback2 = commitCallback;
        AtomicInteger atomicInteger = new AtomicInteger(arrayList.size());
        CommitCallback commitCallback3 = (z, th) -> {
            if (!z) {
                atomicInteger.set(-1);
                commitCallback2.commit(false, th);
            } else if (atomicInteger.decrementAndGet() == 0) {
                commitCallback2.commit(true, (Throwable) null);
            }
        };
        arrayList.forEach(entry2 -> {
            long longValue = ((Long) entry2.getKey()).longValue();
            BucketData bucketData = (BucketData) entry2.getValue();
            ExceptionUtils.unchecked(() -> {
                flushWriter(longValue, bucketData.getBlob(), bucketData.getWriter(), commitCallback3);
            });
            this.buckets.remove(Long.valueOf(longValue));
        });
    }

    public void rollback() {
        init(true);
    }

    private void init() {
        if (this.initialized) {
            return;
        }
        if (!this.tmpDir.exists()) {
            this.tmpDir.mkdirs();
        } else {
            if (!this.tmpDir.isDirectory()) {
                throw new IllegalStateException("Temporary directory " + this.tmpDir + " is not directory");
            }
            remove(this.tmpDir);
            this.tmpDir.mkdirs();
        }
        this.tmpDir.deleteOnExit();
        this.initialized = true;
    }

    private void init(boolean z) {
        if (z) {
            this.maxSeenTimestamp = Long.MIN_VALUE;
            this.lastFlushAttempt = Long.MIN_VALUE;
            this.buckets.clear();
            this.writeSeqNo = 0L;
            this.initialized = false;
        }
        init();
    }

    private void remove(File file) {
        if (!file.isDirectory()) {
            deleteHandlingErrors(file);
            return;
        }
        File[] listFiles = file.listFiles();
        if (listFiles == null) {
            deleteHandlingErrors(file);
            return;
        }
        for (File file2 : listFiles) {
            if (file2.isDirectory()) {
                remove(file2);
            }
            deleteHandlingErrors(file2);
        }
    }

    @VisibleForTesting
    BinaryBlob createLocalBlob() {
        return new BinaryBlob(new File(this.tmpDir, UUID.randomUUID().toString()));
    }

    @VisibleForTesting
    void flush() {
        flushWriters(Long.MAX_VALUE);
    }

    private void flush(File file, long j, CommitCallback commitCallback) {
        try {
            flushToBlob(j, file, createBlob(toBlobName(j - this.rollPeriod, j)));
            deleteHandlingErrors(file, false);
            commitCallback.commit(true, (Throwable) null);
        } catch (Exception e) {
            commitCallback.commit(false, e);
        }
    }

    @VisibleForTesting
    String toBlobName(long j, long j2) {
        return String.format("%s%s-%d_%d_%s.blob", DIR_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneId.ofOffset("UTC", ZoneOffset.UTC))), PREFIX, Long.valueOf(j), Long.valueOf(j2), uuid());
    }

    @VisibleForTesting
    String uuid() {
        return UUID.randomUUID().toString();
    }

    @VisibleForTesting
    void flushToBlob(long j, File file, Blob blob) throws IOException {
        int i = 0;
        WriteChannel writer = client().writer(blob, new Storage.BlobWriteOption[0]);
        Throwable th = null;
        try {
            FileInputStream fileInputStream = new FileInputStream(file);
            Throwable th2 = null;
            try {
                try {
                    byte[] bArr = new byte[this.bufferSize];
                    while (fileInputStream.available() > 0) {
                        int read = fileInputStream.read(bArr);
                        i += read;
                        writer.write(ByteBuffer.wrap(bArr, 0, read));
                    }
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    log.info("Flushed blob {} with size {} KiB", blob.getBlobId().getName(), Double.valueOf(i / 1024.0d));
                } finally {
                }
            } catch (Throwable th4) {
                if (fileInputStream != null) {
                    if (th2 != null) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (writer != null) {
                if (0 != 0) {
                    try {
                        writer.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    writer.close();
                }
            }
        }
    }

    private void deleteHandlingErrors(File file) {
        deleteHandlingErrors(file, true);
    }

    private void deleteHandlingErrors(File file, boolean z) {
        try {
            Files.deleteIfExists(Paths.get(file.getAbsolutePath(), new String[0]));
        } catch (IOException e) {
            if (z) {
                throw new RuntimeException(e);
            }
            log.warn("Failed to delete {}. Ingoring", file, e);
        }
    }

    Executor flushExecutor() {
        if (this.flushExecutor == null) {
            this.flushExecutor = (Executor) this.executorFactory.apply();
        }
        return this.flushExecutor;
    }

    public void close() {
        this.buckets.forEach((l, bucketData) -> {
            try {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                flushWriter(l.longValue(), bucketData.getBlob(), bucketData.getWriter(), (z, th) -> {
                    if (!z) {
                        log.warn("Failed to close writer {}", bucketData.getWriter(), th);
                    }
                    countDownLatch.countDown();
                });
                countDownLatch.await();
            } catch (Exception e) {
                log.warn("Failed to close writer {}", bucketData.getWriter(), e);
            }
        });
        this.buckets.clear();
    }

    private void flushWriter(long j, BinaryBlob binaryBlob, BinaryBlob.Writer writer, CommitCallback commitCallback) throws IOException {
        if (writer != null) {
            writer.close();
            File path = binaryBlob.getPath();
            flushExecutor().execute(() -> {
                flush(path, j, commitCallback);
            });
        }
    }

    @Override // cz.o2.proxima.gcloud.storage.GCloudClient
    @Nullable
    public /* bridge */ /* synthetic */ Storage getClient() {
        return super.getClient();
    }

    @Override // cz.o2.proxima.gcloud.storage.GCloudClient
    public /* bridge */ /* synthetic */ StorageClass getStorageClass() {
        return super.getStorageClass();
    }

    @Override // cz.o2.proxima.gcloud.storage.GCloudClient
    public /* bridge */ /* synthetic */ String getPath() {
        return super.getPath();
    }

    @Override // cz.o2.proxima.gcloud.storage.GCloudClient
    public /* bridge */ /* synthetic */ String getBucket() {
        return super.getBucket();
    }

    @Override // cz.o2.proxima.gcloud.storage.GCloudClient
    public /* bridge */ /* synthetic */ Map getCfg() {
        return super.getCfg();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1854485548:
                if (implMethodName.equals("getExecutorService")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/repository/Context") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    Context context = (Context) serializedLambda.getCapturedArg(0);
                    return context::getExecutorService;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        try {
            MessageDigest messageDigest = MessageDigest.getInstance("MD5");
            messageDigest.update(InetAddress.getLocalHost().getHostName().getBytes(Charset.defaultCharset()));
            PREFIX = new String(Hex.encodeHex(messageDigest.digest())).substring(0, 6);
        } catch (Exception e) {
            log.error("Failed to generate bucket prefix", (Throwable) e);
            throw new RuntimeException(e);
        }
    }
}
