package cz.o2.proxima.storage.pubsub;

import com.google.protobuf.InvalidProtocolBufferException;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.Partitioner;
import cz.o2.proxima.storage.pubsub.io.CommitLogSource;
import cz.o2.proxima.storage.pubsub.partitioned.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.storage.pubsub.partitioned.com.google.common.base.MoreObjects;
import cz.o2.proxima.storage.pubsub.partitioned.internal.proto.PubSub;
import cz.o2.proxima.view.PartitionedLogObserver;
import cz.o2.proxima.view.PartitionedView;
import cz.seznam.euphoria.beam.BeamFlow;
import cz.seznam.euphoria.beam.io.KryoCoder;
import cz.seznam.euphoria.core.annotation.stability.Experimental;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:cz/o2/proxima/storage/pubsub/PubSubPartitionedView.class */
class PubSubPartitionedView extends PubSubReader implements PartitionedView {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PubSubPartitionedView.class);
    private static final Set<Integer> repartitioned = Collections.synchronizedSet(new HashSet());
    private final transient PipelineOptions options;
    private final Partitioner partitioner;
    private final int numPartitions;
    private final Duration orderingLateness;
    private final Duration orderingWindow;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/storage/pubsub/PubSubPartitionedView$TimedDoFn.class */
    public static class TimedDoFn<T> extends DoFn<KV<Integer, AttributeData>, T> {
        private final EntityDescriptor entity;
        private final PartitionedLogObserver<T> observer;
        private final Duration fireInterval;
        private final long allowedLatenessMs;

        @DoFn.StateId("partition")
        final StateSpec<ValueState<Integer>> partitionSpec = StateSpecs.value();

        @DoFn.StateId("heap")
        final StateSpec<BagState<AttributeData>> heapSpec = StateSpecs.bag(new KryoCoder());

        @DoFn.StateId("watermark")
        final StateSpec<ValueState<Long>> watermark = StateSpecs.value();

        @DoFn.TimerId("fire-event")
        final TimerSpec fireEventTimeSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @DoFn.TimerId("fire-processing")
        final TimerSpec fireProcessingTimeSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        TimedDoFn(EntityDescriptor entityDescriptor, PartitionedLogObserver<T> partitionedLogObserver, Duration duration, Duration duration2) {
            this.entity = entityDescriptor;
            this.observer = partitionedLogObserver;
            this.fireInterval = duration;
            this.allowedLatenessMs = duration2.toMillis();
        }

        @DoFn.ProcessElement
        @SuppressFBWarnings({"UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"})
        public void process(DoFn<KV<Integer, AttributeData>, T>.ProcessContext processContext, @DoFn.StateId("heap") BagState<AttributeData> bagState, @DoFn.StateId("partition") ValueState<Integer> valueState, @DoFn.TimerId("fire-event") Timer timer) {
            int intValue = ((Integer) ((KV) processContext.element()).getKey()).intValue();
            if (PubSubPartitionedView.repartitioned.add(Integer.valueOf(intValue))) {
                this.observer.onRepartition(Arrays.asList(() -> {
                    return intValue;
                }));
                valueState.write(Integer.valueOf(intValue));
                Instant now = Instant.now();
                timer.set(now);
                PubSubPartitionedView.log.info("Going to start processing time flushing after watermark reaches {}", now);
            }
            bagState.add(((KV) processContext.element()).getValue());
        }

        @SuppressFBWarnings({"UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"})
        @DoFn.OnTimer("fire-event")
        public synchronized void onTimerEvent(DoFn<KV<Integer, AttributeData>, T>.OnTimerContext onTimerContext, @DoFn.StateId("heap") BagState<AttributeData> bagState, @DoFn.StateId("watermark") ValueState<Long> valueState, @DoFn.StateId("partition") ValueState<Integer> valueState2, @DoFn.TimerId("fire-event") Timer timer, @DoFn.TimerId("fire-processing") Timer timer2) {
            long millis = onTimerContext.timestamp().getMillis() - this.allowedLatenessMs;
            BagState<AttributeData> readLater = bagState.readLater();
            ValueState<Long> readLater2 = valueState.readLater();
            Instant instant = new Instant(millis);
            ValueState<Integer> readLater3 = valueState2.readLater();
            onTimerContext.getClass();
            flushSorted(readLater, readLater2, instant, readLater3, timer2, onTimerContext::output);
            timer.offset(org.joda.time.Duration.millis(this.fireInterval.toMillis())).setRelative();
        }

        @SuppressFBWarnings({"UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"})
        @DoFn.OnTimer("fire-processing")
        public synchronized void onTimerProcessing(DoFn<KV<Integer, AttributeData>, T>.OnTimerContext onTimerContext, @DoFn.StateId("heap") BagState<AttributeData> bagState, @DoFn.StateId("watermark") ValueState<Long> valueState, @DoFn.StateId("partition") ValueState<Integer> valueState2, @DoFn.TimerId("fire-processing") Timer timer) {
            long currentTimeMillis = System.currentTimeMillis() - this.allowedLatenessMs;
            BagState<AttributeData> readLater = bagState.readLater();
            ValueState<Long> readLater2 = valueState.readLater();
            Instant instant = new Instant(currentTimeMillis);
            ValueState<Integer> readLater3 = valueState2.readLater();
            onTimerContext.getClass();
            flushSorted(readLater, readLater2, instant, readLater3, timer, onTimerContext::output);
        }

        void flushSorted(BagState<AttributeData> bagState, ValueState<Long> valueState, Instant instant, ValueState<Integer> valueState2, Timer timer, Consumer<T> consumer) {
            Iterable read = bagState.read();
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            read.forEach(attributeData -> {
                if (instant.isAfter(attributeData.getStamp())) {
                    arrayList2.add(attributeData);
                } else {
                    arrayList.add(attributeData);
                }
            });
            long longValue = ((Long) MoreObjects.firstNonNull(valueState.read(), Long.MIN_VALUE)).longValue();
            arrayList2.stream().sorted((attributeData2, attributeData3) -> {
                return Long.compare(attributeData2.getStamp(), attributeData3.getStamp());
            }).forEach(attributeData4 -> {
                if (attributeData4.getStamp() < longValue) {
                    PubSubPartitionedView.log.warn("Dropping after-watermark data {}, watermark is {}", attributeData4, Long.valueOf(longValue));
                } else {
                    PubSubPartitionedView.toElement(this.entity, attributeData4).ifPresent(streamElement -> {
                        PartitionedLogObserver<T> partitionedLogObserver = this.observer;
                        PartitionedLogObserver.ConfirmCallback confirmCallback = (z, th) -> {
                            if (!z) {
                                throw new RuntimeException(th);
                            }
                        };
                        valueState2.getClass();
                        partitionedLogObserver.onNext(streamElement, confirmCallback, valueState2::read, consumer);
                    });
                }
            });
            if (!arrayList2.isEmpty()) {
                valueState.write(Long.valueOf(instant.getMillis()));
                bagState.clear();
                bagState.getClass();
                arrayList.forEach((v1) -> {
                    r1.add(v1);
                });
            }
            timer.offset(org.joda.time.Duration.millis(this.fireInterval.toMillis())).setRelative();
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1220744517:
                    if (implMethodName.equals("lambda$process$7b2451c6$1")) {
                        z = 3;
                        break;
                    }
                    break;
                case -1005512447:
                    if (implMethodName.equals("output")) {
                        z = false;
                        break;
                    }
                    break;
                case 3496342:
                    if (implMethodName.equals("read")) {
                        z = true;
                        break;
                    }
                    break;
                case 624455021:
                    if (implMethodName.equals("lambda$null$4e295915$1")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/DoFn$WindowedContext") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)V")) {
                        DoFn.OnTimerContext onTimerContext = (DoFn.OnTimerContext) serializedLambda.getCapturedArg(0);
                        return onTimerContext::output;
                    }
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/DoFn$WindowedContext") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)V")) {
                        DoFn.OnTimerContext onTimerContext2 = (DoFn.OnTimerContext) serializedLambda.getCapturedArg(0);
                        return onTimerContext2::output;
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/state/ReadableState") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                        ValueState valueState = (ValueState) serializedLambda.getCapturedArg(0);
                        return valueState::read;
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/view/PartitionedLogObserver$ConfirmCallback") && serializedLambda.getFunctionalInterfaceMethodName().equals("confirm") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/pubsub/PubSubPartitionedView$TimedDoFn") && serializedLambda.getImplMethodSignature().equals("(ZLjava/lang/Throwable;)V")) {
                        return (z2, th) -> {
                            if (!z2) {
                                throw new RuntimeException(th);
                            }
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/pubsub/PubSubPartitionedView$TimedDoFn") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                        int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                        return () -> {
                            return intValue;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @VisibleForTesting
    static void clearPartitioning() {
        repartitioned.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubSubPartitionedView(PartitionedPubSubAccessor partitionedPubSubAccessor, Context context) {
        super(partitionedPubSubAccessor, context);
        this.partitioner = partitionedPubSubAccessor.getPartitioner();
        this.options = partitionedPubSubAccessor.getOptions();
        this.numPartitions = partitionedPubSubAccessor.getNumPartitions();
        this.orderingLateness = partitionedPubSubAccessor.getOrderingLateness();
        this.orderingWindow = partitionedPubSubAccessor.getOrderingWindow();
    }

    public List<Partition> getPartitions() {
        return (List) IntStream.range(0, this.numPartitions).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList());
    }

    public <T> Dataset<T> observePartitions(Collection<Partition> collection, PartitionedLogObserver<T> partitionedLogObserver) {
        return observePartitions(BeamFlow.create(Pipeline.create(this.options)), collection, partitionedLogObserver);
    }

    public <T> Dataset<T> observePartitions(Flow flow, Collection<Partition> collection, PartitionedLogObserver<T> partitionedLogObserver) {
        if (!(flow instanceof BeamFlow)) {
            throw new UnsupportedOperationException("This view cooperates only with BeamFlow. Use euphoria-beam as executor and construct flow by BeamFlow#create");
        }
        BeamFlow beamFlow = (BeamFlow) flow;
        return applyObserver(getEntityDescriptor(), beamFlow.unwrapped(pubsubIO(beamFlow, this, flow.getName(), this.orderingLateness.toMillis())), this.partitioner, this.numPartitions, partitionedLogObserver, this.orderingWindow, this.orderingLateness, beamFlow);
    }

    public <T> Dataset<T> observe(String str, PartitionedLogObserver<T> partitionedLogObserver) {
        return observe(BeamFlow.create(Pipeline.create(this.options)), str, partitionedLogObserver);
    }

    public <T> Dataset<T> observe(Flow flow, String str, PartitionedLogObserver<T> partitionedLogObserver) {
        if (!(flow instanceof BeamFlow)) {
            throw new UnsupportedOperationException("This view cooperates only with BeamFlow. Use euphoria-beam as executor and construct flow by BeamFlow#create");
        }
        BeamFlow beamFlow = (BeamFlow) flow;
        return applyObserver(getEntityDescriptor(), beamFlow.unwrapped(pubsubIO(beamFlow, this, str, this.orderingLateness.toMillis())), this.partitioner, this.numPartitions, partitionedLogObserver, this.orderingWindow, this.orderingLateness, beamFlow);
    }

    private static <T> Dataset<T> applyObserver(EntityDescriptor entityDescriptor, PCollection<AttributeData> pCollection, Partitioner partitioner, int i, PartitionedLogObserver<T> partitionedLogObserver, Duration duration, Duration duration2, BeamFlow beamFlow) {
        PCollection coder = pCollection.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), new TypeDescriptor<AttributeData>() { // from class: cz.o2.proxima.storage.pubsub.PubSubPartitionedView.1
        })).via(attributeData -> {
            return KV.of(Integer.valueOf(((Integer) toElement(entityDescriptor, attributeData).map(streamElement -> {
                return Integer.valueOf((partitioner.getPartitionId(streamElement) & Integer.MAX_VALUE) % i);
            }).orElse(0)).intValue()), attributeData);
        })).setCoder(KvCoder.of(VarIntCoder.of(), new KryoCoder()));
        return beamFlow.wrapped((!duration2.isZero() ? (PCollection) coder.apply(ParDo.of(toTimedDoFn(entityDescriptor, partitionedLogObserver, duration, duration2))) : coder.apply(ParDo.of(toUnwindowedDoFn(entityDescriptor, partitionedLogObserver)))).setCoder(new KryoCoder()));
    }

    @VisibleForTesting
    Dataset<AttributeData> pubsubIO(BeamFlow beamFlow, CommitLogReader commitLogReader, String str, long j) {
        return beamFlow.wrapped(beamFlow.getPipeline().apply(Read.from(CommitLogSource.of(commitLogReader, str, j))).setCoder(new KryoCoder()));
    }

    @VisibleForTesting
    static Optional<AttributeData> toData(PubsubMessage pubsubMessage) {
        try {
            PubSub.KeyValue parseFrom = PubSub.KeyValue.parseFrom(pubsubMessage.getPayload());
            return Optional.of(new AttributeData(parseFrom.getKey(), parseFrom.getAttribute(), parseFrom.getValue().toByteArray(), parseFrom.getDelete(), parseFrom.getDeleteWildcard(), parseFrom.getStamp()));
        } catch (InvalidProtocolBufferException e) {
            log.warn("Failed to parse message {}", pubsubMessage, e);
            return Optional.empty();
        }
    }

    private static <T> DoFn<KV<Integer, AttributeData>, T> toUnwindowedDoFn(final EntityDescriptor entityDescriptor, final PartitionedLogObserver<T> partitionedLogObserver) {
        return new DoFn<KV<Integer, AttributeData>, T>() { // from class: cz.o2.proxima.storage.pubsub.PubSubPartitionedView.2

            @DoFn.StateId("seq")
            final StateSpec<ValueState<Void>> seq = StateSpecs.value();

            @DoFn.ProcessElement
            @SuppressFBWarnings({"UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"})
            public void process(DoFn<KV<Integer, AttributeData>, T>.ProcessContext processContext, @DoFn.StateId("seq") ValueState<Void> valueState) {
                int intValue = ((Integer) ((KV) processContext.element()).getKey()).intValue();
                if (PubSubPartitionedView.repartitioned.add(Integer.valueOf(intValue))) {
                    partitionedLogObserver.onRepartition(Arrays.asList(() -> {
                        return intValue;
                    }));
                }
                Optional<StreamElement> element = PubSubPartitionedView.toElement(entityDescriptor, (AttributeData) ((KV) processContext.element()).getValue());
                PartitionedLogObserver partitionedLogObserver2 = partitionedLogObserver;
                element.ifPresent(streamElement -> {
                    PartitionedLogObserver.ConfirmCallback confirmCallback = (z, th) -> {
                        if (!z) {
                            throw new RuntimeException(th);
                        }
                    };
                    Partition partition = () -> {
                        return intValue;
                    };
                    processContext.getClass();
                    partitionedLogObserver2.onNext(streamElement, confirmCallback, partition, processContext::output);
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -1381790755:
                        if (implMethodName.equals("lambda$null$ab5010b7$1")) {
                            z = 3;
                            break;
                        }
                        break;
                    case -1005512447:
                        if (implMethodName.equals("output")) {
                            z = false;
                            break;
                        }
                        break;
                    case 624455021:
                        if (implMethodName.equals("lambda$null$4e295915$1")) {
                            z = true;
                            break;
                        }
                        break;
                    case 1838975098:
                        if (implMethodName.equals("lambda$process$c00b2b9e$1")) {
                            z = 2;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/DoFn$WindowedContext") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)V")) {
                            DoFn.ProcessContext processContext = (DoFn.ProcessContext) serializedLambda.getCapturedArg(0);
                            return processContext::output;
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/view/PartitionedLogObserver$ConfirmCallback") && serializedLambda.getFunctionalInterfaceMethodName().equals("confirm") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/pubsub/PubSubPartitionedView$2") && serializedLambda.getImplMethodSignature().equals("(ZLjava/lang/Throwable;)V")) {
                            return (z2, th) -> {
                                if (!z2) {
                                    throw new RuntimeException(th);
                                }
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/pubsub/PubSubPartitionedView$2") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                            int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                            return () -> {
                                return intValue;
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/pubsub/PubSubPartitionedView$2") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                            int intValue2 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                            return () -> {
                                return intValue2;
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    private static <T> DoFn<KV<Integer, AttributeData>, T> toTimedDoFn(EntityDescriptor entityDescriptor, PartitionedLogObserver<T> partitionedLogObserver, Duration duration, Duration duration2) {
        return new TimedDoFn(entityDescriptor, partitionedLogObserver, duration, duration2);
    }

    static Optional<StreamElement> toElement(EntityDescriptor entityDescriptor, AttributeData attributeData) {
        long stamp = attributeData.getStamp();
        String uuid = UUID.randomUUID().toString();
        Optional findAttribute = entityDescriptor.findAttribute(attributeData.getAttribute());
        if (findAttribute.isPresent()) {
            return attributeData.isDeleteWildcard() ? Optional.of(StreamElement.deleteWildcard(entityDescriptor, (AttributeDescriptor) findAttribute.get(), uuid, attributeData.getKey(), attributeData.getAttribute(), stamp)) : attributeData.isDelete() ? Optional.of(StreamElement.delete(entityDescriptor, (AttributeDescriptor) findAttribute.get(), uuid, attributeData.getKey(), attributeData.getAttribute(), stamp)) : Optional.of(StreamElement.update(entityDescriptor, (AttributeDescriptor) findAttribute.get(), uuid, attributeData.getKey(), attributeData.getAttribute(), stamp, attributeData.getValue()));
        }
        log.warn("Missing attribute {} of entity {}", attributeData.getAttribute(), entityDescriptor);
        return Optional.empty();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -332336284:
                if (implMethodName.equals("lambda$applyObserver$d1ef95b2$1")) {
                    z = false;
                    break;
                }
                break;
            case 1649082284:
                if (implMethodName.equals("lambda$null$74d5a673$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/pubsub/PubSubPartitionedView") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/repository/EntityDescriptor;Lcz/o2/proxima/storage/commitlog/Partitioner;ILcz/o2/proxima/storage/pubsub/AttributeData;)Lorg/apache/beam/sdk/values/KV;")) {
                    EntityDescriptor entityDescriptor = (EntityDescriptor) serializedLambda.getCapturedArg(0);
                    Partitioner partitioner = (Partitioner) serializedLambda.getCapturedArg(1);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    return attributeData -> {
                        return KV.of(Integer.valueOf(((Integer) toElement(entityDescriptor, attributeData).map(streamElement -> {
                            return Integer.valueOf((partitioner.getPartitionId(streamElement) & Integer.MAX_VALUE) % intValue);
                        }).orElse(0)).intValue()), attributeData);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/pubsub/PubSubPartitionedView") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
