package cz.o2.proxima.beam.io.pubsub;

import cz.o2.proxima.beam.core.DataAccessor;
import cz.o2.proxima.beam.core.io.StreamElementCoder;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.io.pubsub.util.PubSubUtils;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.UriUtil;
import cz.o2.proxima.storage.commitlog.Position;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.Generated;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/io/pubsub/PubSubDataAccessor.class */
public class PubSubDataAccessor implements DataAccessor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PubSubDataAccessor.class);
    private static final long serialVersionUID = 1;
    private final RepositoryFactory repoFactory;
    private final EntityDescriptor entity;
    private final URI uri;
    private final String topic;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubSubDataAccessor(Repository repository, EntityDescriptor entityDescriptor, URI uri) {
        this.repoFactory = repository.asFactory();
        this.entity = entityDescriptor;
        this.uri = uri;
        String authority = uri.getAuthority();
        String pathNormalized = UriUtil.getPathNormalized(uri);
        Preconditions.checkArgument(!authority.isEmpty(), "Authority in URI %s must not be empty", uri);
        Preconditions.checkArgument(!pathNormalized.isEmpty(), "Path in URI %s must specify topic", uri);
        this.topic = String.format("projects/%s/topics/%s", authority, pathNormalized);
    }

    public PCollection<StreamElement> createStream(String str, Pipeline pipeline, Position position, boolean z, boolean z2, long j) {
        PCollection<StreamElement> coder = FlatMap.of(pipeline.apply(PubsubIO.readMessages().fromTopic(this.topic))).using((pubsubMessage, collector) -> {
            Optional<StreamElement> streamElement = PubSubUtils.toStreamElement(this.entity, pubsubMessage.getMessageId(), pubsubMessage.getPayload());
            Objects.requireNonNull(collector);
            streamElement.ifPresent((v1) -> {
                r1.collect(v1);
            });
        }, TypeDescriptor.of(StreamElement.class)).output().setCoder(StreamElementCoder.of(this.repoFactory));
        return z2 ? AssignEventTime.of(coder).using((v0) -> {
            return v0.getStamp();
        }, Duration.millis(Long.MAX_VALUE)).output().setCoder(coder.getCoder()) : coder;
    }

    public PCollection<StreamElement> createBatch(Pipeline pipeline, List<AttributeDescriptor<?>> list, long j, long j2) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public PCollection<StreamElement> createStreamFromUpdates(Pipeline pipeline, List<AttributeDescriptor<?>> list, long j, long j2, long j3) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Generated
    public URI getUri() {
        return this.uri;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 154469144:
                if (implMethodName.equals("lambda$createStream$cb0187de$1")) {
                    z = true;
                    break;
                }
                break;
            case 1965582861:
                if (implMethodName.equals("getStamp")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/extensions/euphoria/core/client/functional/ExtractEventTime") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/StreamElement") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getStamp();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/extensions/euphoria/core/client/functional/UnaryFunctor") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/beam/sdk/extensions/euphoria/core/client/io/Collector;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/io/pubsub/PubSubDataAccessor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsub/PubsubMessage;Lorg/apache/beam/sdk/extensions/euphoria/core/client/io/Collector;)V")) {
                    PubSubDataAccessor pubSubDataAccessor = (PubSubDataAccessor) serializedLambda.getCapturedArg(0);
                    return (pubsubMessage, collector) -> {
                        Optional<StreamElement> streamElement = PubSubUtils.toStreamElement(this.entity, pubsubMessage.getMessageId(), pubsubMessage.getPayload());
                        Objects.requireNonNull(collector);
                        streamElement.ifPresent((v1) -> {
                            r1.collect(v1);
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
