package org.springframework.integration.mongodb.inbound;

import org.reactivestreams.Publisher;
import org.springframework.context.ApplicationContext;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.util.Pair;
import org.springframework.expression.Expression;
import org.springframework.integration.mongodb.support.MongoHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/integration/mongodb/inbound/ReactiveMongoDbMessageSource.class */
public class ReactiveMongoDbMessageSource extends AbstractMongoDbMessageSource<Publisher<?>> {

    @Nullable
    private final ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory;
    private ReactiveMongoOperations reactiveMongoTemplate;

    public ReactiveMongoDbMessageSource(ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory, Expression expression) {
        super(expression);
        Assert.notNull(reactiveMongoDatabaseFactory, "'reactiveMongoDatabaseFactory' must not be null");
        this.reactiveMongoDatabaseFactory = reactiveMongoDatabaseFactory;
    }

    public ReactiveMongoDbMessageSource(ReactiveMongoOperations reactiveMongoOperations, Expression expression) {
        super(expression);
        Assert.notNull(reactiveMongoOperations, "'reactiveMongoTemplate' must not be null");
        this.reactiveMongoDatabaseFactory = null;
        this.reactiveMongoTemplate = reactiveMongoOperations;
    }

    public String getComponentType() {
        return "mongo:reactive-inbound-channel-adapter";
    }

    @Override // org.springframework.integration.mongodb.inbound.AbstractMongoDbMessageSource
    protected void onInit() {
        super.onInit();
        if (this.reactiveMongoDatabaseFactory != null) {
            ReactiveMongoTemplate reactiveMongoTemplate = new ReactiveMongoTemplate(this.reactiveMongoDatabaseFactory, getMongoConverter());
            ApplicationContext applicationContext = getApplicationContext();
            if (applicationContext != null) {
                reactiveMongoTemplate.setApplicationContext(applicationContext);
            }
            this.reactiveMongoTemplate = reactiveMongoTemplate;
        }
        setMongoConverter(this.reactiveMongoTemplate.getConverter());
        setInitialized(true);
    }

    public Object doReceive() {
        Assert.isTrue(isInitialized(), "This class is not yet initialized. Invoke its afterPropertiesSet() method");
        Query evaluateQueryExpression = evaluateQueryExpression();
        String evaluateCollectionNameExpression = evaluateCollectionNameExpression();
        return getMessageBuilderFactory().withPayload(updateIfAny(isExpectSingleResult() ? this.reactiveMongoTemplate.findOne(evaluateQueryExpression, getEntityClass(), evaluateCollectionNameExpression) : this.reactiveMongoTemplate.find(evaluateQueryExpression, getEntityClass(), evaluateCollectionNameExpression), evaluateCollectionNameExpression)).setHeader(MongoHeaders.COLLECTION_NAME, evaluateCollectionNameExpression);
    }

    private Publisher<?> updateIfAny(Publisher<?> publisher, String str) {
        Update evaluateUpdateExpression = evaluateUpdateExpression();
        return evaluateUpdateExpression != null ? publisher instanceof Mono ? updateSingle((Mono) publisher, evaluateUpdateExpression, str) : updateMulti((Flux) publisher, evaluateUpdateExpression, str) : publisher;
    }

    private Publisher<?> updateSingle(Mono<?> mono, Update update, String str) {
        return mono.flatMap(obj -> {
            Pair<String, Object> idForEntity = idForEntity(obj);
            return this.reactiveMongoTemplate.updateFirst(new Query(Criteria.where((String) idForEntity.getFirst()).is(idForEntity.getSecond())), update, str).thenReturn(obj);
        });
    }

    private Publisher<?> updateMulti(Flux<?> flux, Update update, String str) {
        return flux.collectList().flatMapMany(list -> {
            return this.reactiveMongoTemplate.updateMulti(getByIdInQuery(list), update, str).thenMany(Flux.fromIterable(list));
        });
    }
}
