package co.cask.cdap.data2.transaction.stream.inmemory;

import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.transaction.queue.inmemory.InMemoryQueueService;
import co.cask.cdap.data2.transaction.stream.QueueToStreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumerFactory;
import co.cask.cdap.proto.id.FlowId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Iterator;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/inmemory/InMemoryStreamConsumerFactory.class */
public final class InMemoryStreamConsumerFactory implements StreamConsumerFactory {
    private final QueueClientFactory queueClientFactory;
    private final InMemoryQueueService queueService;

    @Inject
    public InMemoryStreamConsumerFactory(QueueClientFactory queueClientFactory, InMemoryQueueService inMemoryQueueService) {
        this.queueClientFactory = queueClientFactory;
        this.queueService = inMemoryQueueService;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerFactory
    public StreamConsumer create(StreamId streamId, String str, ConsumerConfig consumerConfig) throws IOException {
        return new QueueToStreamConsumer(streamId, consumerConfig, this.queueClientFactory.createConsumer(QueueName.fromStream(streamId), consumerConfig, -1));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerFactory
    public void dropAll(StreamId streamId, String str, Iterable<Long> iterable) throws IOException {
        String format = String.format("Namespace string %s must be of the form <app>.<flow>", str);
        Iterator it = Splitter.on('.').split(str).iterator();
        Preconditions.checkArgument(it.hasNext(), format);
        String str2 = (String) it.next();
        Preconditions.checkArgument(it.hasNext(), format);
        this.queueService.truncateAllWithPrefix(QueueName.prefixForFlow(new FlowId(streamId.getNamespace(), str2, (String) it.next())));
    }
}
