package co.cask.cdap.data2.transaction.stream;

import co.cask.cdap.api.data.schema.SchemaHash;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.io.BinaryDecoder;
import co.cask.cdap.common.stream.StreamEventCodec;
import co.cask.cdap.common.stream.StreamEventDataCodec;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.DequeueResult;
import co.cask.cdap.data2.queue.QueueConsumer;
import co.cask.cdap.proto.Id;
import co.cask.common.io.ByteBufferInputStream;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionAware;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/QueueToStreamConsumer.class */
public final class QueueToStreamConsumer implements StreamConsumer {
    private static final StreamEventCodec STREAM_EVENT_CODEC = new StreamEventCodec();
    private final Id.Stream streamId;
    private final ConsumerConfig consumerConfig;
    private final QueueConsumer consumer;

    public QueueToStreamConsumer(Id.Stream stream, ConsumerConfig consumerConfig, QueueConsumer queueConsumer) {
        this.streamId = stream;
        this.consumerConfig = consumerConfig;
        this.consumer = queueConsumer;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public Id.Stream getStreamId() {
        return this.streamId;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public ConsumerConfig getConsumerConfig() {
        return this.consumerConfig;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public DequeueResult<StreamEvent> poll(int i, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        final DequeueResult<byte[]> dequeue = this.consumer.dequeue(i);
        ImmutableList.Builder builder = ImmutableList.builder();
        for (byte[] bArr : dequeue) {
            try {
                builder.add(STREAM_EVENT_CODEC.decodePayload(bArr));
            } catch (Throwable th) {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                Preconditions.checkArgument(new SchemaHash(wrap).equals(StreamEventDataCodec.STREAM_DATA_SCHEMA.getSchemaHash()), "Schema from payload not matching with StreamEventData schema.");
                builder.add(new StreamEvent(StreamEventDataCodec.decode(new BinaryDecoder(new ByteBufferInputStream(wrap))), 0L));
            }
        }
        final ImmutableList build = builder.build();
        return new DequeueResult<StreamEvent>() { // from class: co.cask.cdap.data2.transaction.stream.QueueToStreamConsumer.1
            @Override // co.cask.cdap.data2.queue.DequeueResult
            public boolean isEmpty() {
                return build.isEmpty();
            }

            @Override // co.cask.cdap.data2.queue.DequeueResult
            public void reclaim() {
                dequeue.reclaim();
            }

            @Override // co.cask.cdap.data2.queue.DequeueResult
            public int size() {
                return build.size();
            }

            @Override // java.lang.Iterable
            public Iterator<StreamEvent> iterator() {
                return build.iterator();
            }
        };
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.consumer.close();
    }

    public void startTx(Transaction transaction) {
        if (this.consumer instanceof TransactionAware) {
            this.consumer.startTx(transaction);
        }
    }

    public Collection<byte[]> getTxChanges() {
        return this.consumer instanceof TransactionAware ? this.consumer.getTxChanges() : ImmutableList.of();
    }

    public boolean commitTx() throws Exception {
        if (this.consumer instanceof TransactionAware) {
            return this.consumer.commitTx();
        }
        return true;
    }

    public void postTxCommit() {
        if (this.consumer instanceof TransactionAware) {
            this.consumer.postTxCommit();
        }
    }

    public boolean rollbackTx() throws Exception {
        if (this.consumer instanceof TransactionAware) {
            return this.consumer.rollbackTx();
        }
        return true;
    }

    public String getTransactionAwareName() {
        return Objects.toStringHelper(this).add("queue", this.streamId).add("config", this.consumerConfig).toString();
    }
}
