package org.springframework.integration.cassandra.outbound;

import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.StreamSupport;
import org.springframework.dao.DataAccessException;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.data.cassandra.core.InsertOptions;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.UpdateOptions;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.integration.expression.ExpressionEvalMap;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/integration/cassandra/outbound/CassandraMessageHandler.class */
public class CassandraMessageHandler extends AbstractReplyProducingMessageHandler {
    private final Map<String, Expression> parameterExpressions;
    private Type mode;
    private final ReactiveCassandraOperations cassandraOperations;
    private boolean producesReply;
    private String ingestQuery;
    private WriteOptions writeOptions;
    private ReactiveSessionMessageCallback sessionMessageCallback;
    private EvaluationContext evaluationContext;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/springframework/integration/cassandra/outbound/CassandraMessageHandler$ReactiveSessionMessageCallback.class */
    public interface ReactiveSessionMessageCallback {
        Mono<ReactiveResultSet> doInSession(ReactiveSession reactiveSession, Message<?> message) throws DriverException, DataAccessException;
    }

    /* loaded from: input_file:org/springframework/integration/cassandra/outbound/CassandraMessageHandler$ReactiveWriteResult.class */
    private static final class ReactiveWriteResult extends WriteResult {
        ReactiveWriteResult(ReactiveResultSet reactiveResultSet) {
            super(reactiveResultSet.getAllExecutionInfo(), reactiveResultSet.wasApplied(), StreamSupport.stream(reactiveResultSet.availableRows().toIterable().spliterator(), false).toList());
        }
    }

    /* loaded from: input_file:org/springframework/integration/cassandra/outbound/CassandraMessageHandler$Type.class */
    public enum Type {
        INSERT,
        UPDATE,
        DELETE,
        STATEMENT
    }

    public CassandraMessageHandler(ReactiveCassandraOperations reactiveCassandraOperations) {
        this(reactiveCassandraOperations, Type.INSERT);
    }

    public CassandraMessageHandler(ReactiveCassandraOperations reactiveCassandraOperations, Type type) {
        InsertOptions empty;
        this.parameterExpressions = new HashMap();
        Assert.notNull(reactiveCassandraOperations, "'cassandraOperations' must not be null.");
        Assert.notNull(type, "'queryType' must not be null.");
        this.cassandraOperations = reactiveCassandraOperations;
        this.mode = type;
        setAsync(true);
        switch (this.mode) {
            case INSERT:
                empty = InsertOptions.empty();
                break;
            case UPDATE:
                empty = UpdateOptions.empty();
                break;
            case DELETE:
            case STATEMENT:
                empty = WriteOptions.empty();
                break;
            default:
                throw new IncompatibleClassChangeError();
        }
        this.writeOptions = empty;
    }

    public void setIngestQuery(String str) {
        Assert.hasText(str, "'ingestQuery' must not be empty");
        this.ingestQuery = str;
        this.mode = Type.INSERT;
    }

    public void setWriteOptions(WriteOptions writeOptions) {
        Assert.notNull(writeOptions, "'writeOptions' must not be null");
        this.writeOptions = writeOptions;
    }

    public void setProducesReply(boolean z) {
        this.producesReply = z;
    }

    public void setStatementExpressionString(String str) {
        setStatementExpression(EXPRESSION_PARSER.parseExpression(str));
    }

    public void setStatementExpression(Expression expression) {
        setStatementProcessor(new ExpressionEvaluatingMessageProcessor<Statement>(expression, Statement.class) { // from class: org.springframework.integration.cassandra.outbound.CassandraMessageHandler.1
            protected StandardEvaluationContext getEvaluationContext() {
                return CassandraMessageHandler.this.evaluationContext;
            }
        });
    }

    public void setQuery(String str) {
        Assert.hasText(str, "'query' must not be empty");
        this.sessionMessageCallback = (reactiveSession, message) -> {
            return reactiveSession.execute(str, ExpressionEvalMap.from(this.parameterExpressions).usingEvaluationContext(this.evaluationContext).withRoot(message).build());
        };
        this.mode = Type.STATEMENT;
    }

    public void setParameterExpressions(Map<String, Expression> map) {
        Assert.notEmpty(map, "'parameterExpressions' must not be empty.");
        this.parameterExpressions.clear();
        this.parameterExpressions.putAll(map);
    }

    public void setStatementProcessor(MessageProcessor<Statement<?>> messageProcessor) {
        Assert.notNull(messageProcessor, "'statementProcessor' must not be null.");
        this.sessionMessageCallback = (reactiveSession, message) -> {
            return reactiveSession.execute(QueryOptionsUtil.addQueryOptions((Statement) messageProcessor.processMessage(message), this.writeOptions));
        };
        this.mode = Type.STATEMENT;
    }

    public String getComponentType() {
        return "cassandra:outbound-" + (this.producesReply ? "gateway" : "channel-adapter");
    }

    protected void doInit() {
        super.doInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        StandardTypeLocator typeLocator = this.evaluationContext.getTypeLocator();
        if (typeLocator instanceof StandardTypeLocator) {
            typeLocator.registerImport(QueryBuilder.class.getPackage().getName());
        }
    }

    protected Object handleRequestMessage(Message<?> message) {
        Mono<? extends WriteResult> handleStatement;
        Object payload = message.getPayload();
        switch (payload instanceof Statement ? Type.STATEMENT : this.mode) {
            case INSERT:
                handleStatement = handleInsert(payload);
                break;
            case UPDATE:
                handleStatement = handleUpdate(payload);
                break;
            case DELETE:
                handleStatement = handleDelete(payload);
                break;
            case STATEMENT:
                handleStatement = handleStatement(message);
                break;
            default:
                throw new IncompatibleClassChangeError();
        }
        Mono<? extends WriteResult> mono = handleStatement;
        if (this.producesReply) {
            return isAsync() ? mono : mono.block();
        }
        if (isAsync()) {
            mono.subscribe();
            return null;
        }
        mono.block();
        return null;
    }

    private Mono<? extends WriteResult> handleInsert(Object obj) {
        if (this.ingestQuery == null) {
            return obj instanceof List ? this.cassandraOperations.batchOps().insert((List) obj, this.writeOptions).execute() : this.cassandraOperations.insert(obj, this.writeOptions);
        }
        Assert.isInstanceOf(List.class, obj, "to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
        Iterator it = ((List) obj).iterator();
        while (it.hasNext()) {
            Assert.isInstanceOf(List.class, it.next(), "to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
        }
        List list = (List) obj;
        return this.cassandraOperations.getReactiveCqlOperations().execute(reactiveSession -> {
            Mono map = reactiveSession.prepare(QueryOptionsUtil.addQueryOptions(SimpleStatement.newInstance(this.ingestQuery), this.writeOptions)).flatMapMany(preparedStatement -> {
                return Flux.fromIterable(list).map(list2 -> {
                    return preparedStatement.bind(list2.toArray());
                });
            }).collect(() -> {
                return new BatchStatementBuilder(BatchType.UNLOGGED);
            }, (v0, v1) -> {
                v0.addStatement(v1);
            }).map((v0) -> {
                return v0.build();
            });
            Objects.requireNonNull(reactiveSession);
            return map.flatMap((v1) -> {
                return r1.execute(v1);
            }).transform(this::transformToWriteResult);
        }).next();
    }

    private Mono<? extends WriteResult> handleUpdate(Object obj) {
        return obj instanceof List ? this.cassandraOperations.batchOps().update((List) obj, this.writeOptions).execute() : this.cassandraOperations.update(obj, this.writeOptions);
    }

    private Mono<WriteResult> handleDelete(Object obj) {
        return obj instanceof List ? this.cassandraOperations.batchOps().delete((List) obj).execute() : this.cassandraOperations.delete(obj, this.writeOptions);
    }

    private Mono<WriteResult> handleStatement(Message<?> message) {
        Object payload = message.getPayload();
        return (payload instanceof Statement ? this.cassandraOperations.getReactiveCqlOperations().queryForResultSet((Statement) payload) : this.cassandraOperations.getReactiveCqlOperations().execute(reactiveSession -> {
            return this.sessionMessageCallback.doInSession(reactiveSession, message);
        }).next()).transform(this::transformToWriteResult);
    }

    private Mono<WriteResult> transformToWriteResult(Mono<ReactiveResultSet> mono) {
        return mono.map(ReactiveWriteResult::new);
    }
}
