package cz.seznam.euphoria.beam;

import cz.seznam.euphoria.beam.window.BeamWindowFn;
import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.functional.ReduceFunctor;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.shadow.com.google.common.collect.Streams;
import java.lang.invoke.SerializedLambda;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/* loaded from: input_file:cz/seznam/euphoria/beam/ReduceByKeyTranslator.class */
class ReduceByKeyTranslator implements OperatorTranslator<ReduceByKey> {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/beam/ReduceByKeyTranslator$ReduceDoFn.class */
    public static class ReduceDoFn<K, V, O> extends DoFn<KV<K, Iterable<V>>, Pair<K, O>> {
        private final ReduceFunctor<V, O> reducer;
        private final AccumulatorProvider accumulators;

        ReduceDoFn(ReduceFunctor<V, O> reduceFunctor, AccumulatorProvider accumulatorProvider) {
            this.reducer = reduceFunctor;
            this.accumulators = accumulatorProvider;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, Iterable<V>>, Pair<K, O>>.ProcessContext processContext) {
            this.reducer.apply(StreamSupport.stream(((Iterable) ((KV) processContext.element()).getValue()).spliterator(), false), new DoFnCollector(this.accumulators, obj -> {
                processContext.output(Pair.of(((KV) processContext.element()).getKey(), obj));
            }));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1986993027:
                    if (implMethodName.equals("lambda$processElement$7ab5e509$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/beam/ReduceByKeyTranslator$ReduceDoFn") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/DoFn$ProcessContext;Ljava/lang/Object;)V")) {
                        DoFn.ProcessContext processContext = (DoFn.ProcessContext) serializedLambda.getCapturedArg(0);
                        return obj -> {
                            processContext.output(Pair.of(((KV) processContext.element()).getKey(), obj));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @Override // cz.seznam.euphoria.beam.OperatorTranslator
    public PCollection<?> translate(ReduceByKey reduceByKey, BeamExecutorContext beamExecutorContext) {
        return doTranslate(reduceByKey, beamExecutorContext);
    }

    private static <IN, KEY, VALUE, OUT, W extends Window<W>> PCollection<Pair<KEY, OUT>> doTranslate(ReduceByKey<IN, KEY, VALUE, OUT, W> reduceByKey, BeamExecutorContext beamExecutorContext) {
        final UnaryFunction<IN, KEY> keyExtractor = reduceByKey.getKeyExtractor();
        final UnaryFunction<IN, VALUE> valueExtractor = reduceByKey.getValueExtractor();
        ReduceFunctor<VALUE, OUT> reducer = reduceByKey.getReducer();
        Coder<OUT> coder = beamExecutorContext.getCoder(keyExtractor);
        Coder<OUT> coder2 = beamExecutorContext.getCoder(valueExtractor);
        PCollection coder3 = (reduceByKey.getWindowing() == null ? beamExecutorContext.getInput(reduceByKey) : beamExecutorContext.getInput(reduceByKey).apply(org.apache.beam.sdk.transforms.windowing.Window.into(BeamWindowFn.wrap(reduceByKey.getWindowing())).triggering(AfterWatermark.pastEndOfWindow()).discardingFiredPanes().withAllowedLateness(beamExecutorContext.getAllowedLateness(reduceByKey)))).apply(MapElements.via(new SimpleFunction<IN, KV<KEY, VALUE>>() { // from class: cz.seznam.euphoria.beam.ReduceByKeyTranslator.1
            public KV<KEY, VALUE> apply(IN in) {
                return KV.of(UnaryFunction.this.apply(in), valueExtractor.apply(in));
            }

            /* JADX WARN: Multi-variable type inference failed */
            /* renamed from: apply, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m1112apply(Object obj) {
                return apply((AnonymousClass1<IN, KEY, VALUE>) obj);
            }
        })).setCoder(KvCoder.of(coder, coder2));
        if (reduceByKey.isCombinable()) {
            return coder3.apply(Combine.perKey(asCombiner(reducer))).apply(MapElements.via(new SimpleFunction<KV<KEY, VALUE>, Pair<KEY, VALUE>>() { // from class: cz.seznam.euphoria.beam.ReduceByKeyTranslator.2
                public Pair<KEY, VALUE> apply(KV<KEY, VALUE> kv) {
                    return Pair.of(kv.getKey(), kv.getValue());
                }
            }));
        }
        return coder3.apply(GroupByKey.create()).setCoder(KvCoder.of(coder, IterableCoder.of(coder2))).apply(ParDo.of(new ReduceDoFn(reducer, new LazyAccumulatorProvider(beamExecutorContext.getAccumulatorFactory(), beamExecutorContext.getSettings()))));
    }

    private static <IN, OUT> SerializableFunction<Iterable<IN>, IN> asCombiner(ReduceFunctor<IN, OUT> reduceFunctor) {
        CombinableCollector combinableCollector = new CombinableCollector();
        return iterable -> {
            reduceFunctor.apply(Streams.stream(iterable), combinableCollector);
            return combinableCollector.get();
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 272870545:
                if (implMethodName.equals("lambda$asCombiner$87674ef7$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/beam/ReduceByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/functional/ReduceFunctor;Lcz/seznam/euphoria/beam/CombinableCollector;Ljava/lang/Iterable;)Ljava/lang/Object;")) {
                    ReduceFunctor reduceFunctor = (ReduceFunctor) serializedLambda.getCapturedArg(0);
                    CombinableCollector combinableCollector = (CombinableCollector) serializedLambda.getCapturedArg(1);
                    return iterable -> {
                        reduceFunctor.apply(Streams.stream(iterable), combinableCollector);
                        return combinableCollector.get();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
