package org.springframework.integration.mongodb.inbound;

import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mongodb.support.MongoHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/mongodb/inbound/MongoDbChangeStreamMessageProducer.class */
public class MongoDbChangeStreamMessageProducer extends MessageProducerSupport {
    private final ReactiveMongoOperations mongoOperations;

    @Nullable
    private String collection;
    private Class<?> domainType = Document.class;
    private ChangeStreamOptions options = ChangeStreamOptions.empty();
    private boolean extractBody = true;

    public MongoDbChangeStreamMessageProducer(ReactiveMongoOperations reactiveMongoOperations) {
        Assert.notNull(reactiveMongoOperations, "'mongoOperations' must not be null");
        this.mongoOperations = reactiveMongoOperations;
    }

    public void setDomainType(Class<?> cls) {
        Assert.notNull(cls, "'domainType' must not be null");
        this.domainType = cls;
    }

    public void setCollection(String str) {
        this.collection = str;
    }

    public void setOptions(ChangeStreamOptions changeStreamOptions) {
        Assert.notNull(changeStreamOptions, "'options' must not be null");
        this.options = changeStreamOptions;
    }

    public void setExtractBody(boolean z) {
        this.extractBody = z;
    }

    public String getComponentType() {
        return "mongo:change-stream-inbound-channel-adapter";
    }

    protected void doStart() {
        subscribeToPublisher(this.mongoOperations.changeStream(this.collection, this.options, this.domainType).map(changeStreamEvent -> {
            return MessageBuilder.withPayload((!this.extractBody || changeStreamEvent.getBody() == null) ? changeStreamEvent : changeStreamEvent.getBody()).setHeader(MongoHeaders.COLLECTION_NAME, changeStreamEvent.getCollectionName()).setHeader(MongoHeaders.CHANGE_STREAM_OPERATION_TYPE, changeStreamEvent.getOperationType()).setHeader(MongoHeaders.CHANGE_STREAM_TIMESTAMP, changeStreamEvent.getTimestamp()).setHeader(MongoHeaders.CHANGE_STREAM_RESUME_TOKEN, changeStreamEvent.getResumeToken()).build();
        }));
    }
}
