package org.springframework.data.cassandra.core.cql.session;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.data.cassandra.core.cql.session.init.ScriptUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:org/springframework/data/cassandra/core/cql/session/DefaultBridgedReactiveSession.class */
public class DefaultBridgedReactiveSession implements ReactiveSession {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final CqlSession session;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/cassandra/core/cql/session/DefaultBridgedReactiveSession$DefaultReactiveResultSet.class */
    public static class DefaultReactiveResultSet implements ReactiveResultSet {
        private final AsyncResultSet resultSet;
        private final boolean wasApplied;

        DefaultReactiveResultSet(AsyncResultSet asyncResultSet) {
            boolean z;
            this.resultSet = asyncResultSet;
            try {
                z = asyncResultSet.wasApplied();
            } catch (Exception e) {
                z = false;
            }
            this.wasApplied = z;
        }

        @Override // org.springframework.data.cassandra.ReactiveResultSet
        public Flux<Row> rows() {
            return getRows(Mono.just(this.resultSet));
        }

        @Override // org.springframework.data.cassandra.ReactiveResultSet
        public Flux<Row> availableRows() {
            return toRows(this.resultSet);
        }

        private Flux<Row> getRows(Mono<AsyncResultSet> mono) {
            return mono.flatMapMany(asyncResultSet -> {
                Flux<Row> rows = toRows(asyncResultSet);
                if (!asyncResultSet.hasMorePages()) {
                    return rows;
                }
                MonoProcessor create = MonoProcessor.create();
                return rows.doOnComplete(() -> {
                    fetchMore(asyncResultSet.fetchNextPage(), create);
                }).concatWith(getRows(create));
            });
        }

        static Flux<Row> toRows(AsyncResultSet asyncResultSet) {
            return Flux.fromIterable(asyncResultSet.currentPage());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void fetchMore(CompletionStage<AsyncResultSet> completionStage, MonoProcessor<AsyncResultSet> monoProcessor) {
            try {
                completionStage.whenComplete((asyncResultSet, th) -> {
                    if (th != null) {
                        monoProcessor.onError(th);
                    } else {
                        monoProcessor.onNext(asyncResultSet);
                        monoProcessor.onComplete();
                    }
                });
            } catch (Exception e) {
                monoProcessor.onError(e);
            }
        }

        @Override // org.springframework.data.cassandra.ReactiveResultSet
        public ColumnDefinitions getColumnDefinitions() {
            return this.resultSet.getColumnDefinitions();
        }

        @Override // org.springframework.data.cassandra.ReactiveResultSet
        public boolean wasApplied() {
            return this.wasApplied;
        }

        @Override // org.springframework.data.cassandra.ReactiveResultSet
        public ExecutionInfo getExecutionInfo() {
            return this.resultSet.getExecutionInfo();
        }

        @Override // org.springframework.data.cassandra.ReactiveResultSet
        public List<ExecutionInfo> getAllExecutionInfo() {
            return Collections.singletonList(getExecutionInfo());
        }
    }

    public DefaultBridgedReactiveSession(CqlSession cqlSession) {
        Assert.notNull(cqlSession, "Session must not be null");
        this.session = cqlSession;
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Metadata getMetadata() {
        return this.session.getMetadata();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Optional<CqlIdentifier> getKeyspace() {
        return this.session.getKeyspace();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public boolean isClosed() {
        return this.session.isClosed();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public DriverContext getContext() {
        return this.session.getContext();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str) {
        Assert.hasText(str, "Query must not be empty");
        return execute((Statement<?>) SimpleStatement.newInstance(str));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str, Object... objArr) {
        Assert.hasText(str, "Query must not be empty");
        return execute((Statement<?>) SimpleStatement.newInstance(str, objArr));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str, Map<String, Object> map) {
        Assert.hasText(str, "Query must not be empty");
        return execute((Statement<?>) SimpleStatement.newInstance(str, map));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(Statement<?> statement) {
        Assert.notNull(statement, "Statement must not be null");
        return Mono.fromCompletionStage(() -> {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Executing statement [{}]", getCql(statement));
            }
            return this.session.executeAsync(statement);
        }).map(DefaultReactiveResultSet::new);
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<PreparedStatement> prepare(String str) {
        Assert.hasText(str, "Query must not be empty");
        return prepare(SimpleStatement.newInstance(str));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<PreparedStatement> prepare(SimpleStatement simpleStatement) {
        Assert.notNull(simpleStatement, "Statement must not be null");
        return Mono.fromCompletionStage(() -> {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Preparing statement [{}]", getCql(simpleStatement));
            }
            return this.session.prepareAsync(simpleStatement);
        });
    }

    private static String getCql(Object obj) {
        if (obj instanceof SimpleStatement) {
            return ((SimpleStatement) obj).getQuery();
        }
        if (obj instanceof PreparedStatement) {
            return ((PreparedStatement) obj).getQuery();
        }
        if (obj instanceof BoundStatement) {
            return getCql(((BoundStatement) obj).getPreparedStatement());
        }
        if (!(obj instanceof BatchStatement)) {
            return String.format("Unknown: %s", obj);
        }
        StringBuilder sb = new StringBuilder();
        Iterator it = ((BatchStatement) obj).iterator();
        while (it.hasNext()) {
            String cql = getCql((BatchableStatement) it.next());
            sb.append(cql).append(cql.endsWith(ScriptUtils.DEFAULT_STATEMENT_SEPARATOR) ? "" : ScriptUtils.DEFAULT_STATEMENT_SEPARATOR);
        }
        return sb.toString();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.session.close();
    }
}
