package cz.seznam.euphoria.beam.io;

import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.Writer;
import java.io.IOException;
import java.util.Objects;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;

@DoFn.BoundedPerElement
/* loaded from: input_file:cz/seznam/euphoria/beam/io/BeamWriteSink.class */
public class BeamWriteSink<T> extends PTransform<PCollection<T>, PDone> {
    private final DataSink<T> sink;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/beam/io/BeamWriteSink$WriteFn.class */
    public static final class WriteFn<T> extends DoFn<T, Void> {
        private final DataSink<T> sink;
        private final int partitionId;
        Writer<T> writer = null;

        WriteFn(int i, DataSink<T> dataSink) {
            this.partitionId = i;
            this.sink = dataSink;
        }

        @DoFn.Setup
        public void setup() {
            this.writer = this.sink.openWriter(this.partitionId);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void processElement(DoFn<T, Void>.ProcessContext processContext, BoundedWindow boundedWindow) throws IOException {
            this.writer.write(processContext.element());
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            this.writer.flush();
        }

        @DoFn.Teardown
        public void tearDown() throws IOException {
            this.writer.commit();
            this.writer.close();
        }
    }

    public static <T> BeamWriteSink<T> wrap(DataSink<T> dataSink) {
        return new BeamWriteSink<>(dataSink);
    }

    private BeamWriteSink(DataSink<T> dataSink) {
        this.sink = (DataSink) Objects.requireNonNull(dataSink);
    }

    public PDone expand(PCollection<T> pCollection) {
        pCollection.apply(ParDo.of(new WriteFn(0, this.sink)));
        return PDone.in(pCollection.getPipeline());
    }
}
