package org.springframework.data.cassandra.core;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Function;
import lombok.NonNull;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.data.cassandra.ReactiveSessionFactory;
import org.springframework.data.cassandra.core.EntityOperations;
import org.springframework.data.cassandra.core.ReactiveDeleteOperation;
import org.springframework.data.cassandra.core.ReactiveInsertOperation;
import org.springframework.data.cassandra.core.ReactiveSelectOperation;
import org.springframework.data.cassandra.core.ReactiveUpdateOperation;
import org.springframework.data.cassandra.core.convert.CassandraConverter;
import org.springframework.data.cassandra.core.convert.MappingCassandraConverter;
import org.springframework.data.cassandra.core.cql.CassandraAccessor;
import org.springframework.data.cassandra.core.cql.CqlProvider;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.ReactiveCqlOperations;
import org.springframework.data.cassandra.core.cql.ReactiveCqlTemplate;
import org.springframework.data.cassandra.core.cql.ReactiveSessionCallback;
import org.springframework.data.cassandra.core.cql.RowMapper;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.core.cql.session.DefaultReactiveSessionFactory;
import org.springframework.data.cassandra.core.cql.util.StatementBuilder;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.event.AfterConvertEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterLoadEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterSaveEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.cassandra.core.mapping.event.CassandraMappingEvent;
import org.springframework.data.cassandra.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.cassandra.core.mapping.event.ReactiveBeforeSaveCallback;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.cassandra.core.query.Update;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

/* loaded from: input_file:org/springframework/data/cassandra/core/ReactiveCassandraTemplate.class */
public class ReactiveCassandraTemplate implements ReactiveCassandraOperations, ApplicationEventPublisherAware, ApplicationContextAware {

    @Nullable
    private ApplicationEventPublisher eventPublisher;

    @Nullable
    private ReactiveEntityCallbacks entityCallbacks;
    private final CassandraConverter converter;
    private final EntityOperations entityOperations;
    private final ReactiveCqlOperations cqlOperations;
    private final SpelAwareProxyProjectionFactory projectionFactory;
    private final StatementFactory statementFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/cassandra/core/ReactiveCassandraTemplate$StatementCallback.class */
    public static final class StatementCallback implements ReactiveSessionCallback<WriteResult>, CqlProvider {

        @NonNull
        private final SimpleStatement statement;

        @Override // org.springframework.data.cassandra.core.cql.ReactiveSessionCallback
        public Publisher<WriteResult> doInSession(ReactiveSession reactiveSession) throws DriverException, DataAccessException {
            return reactiveSession.execute((Statement<?>) this.statement).flatMap(StatementCallback::toWriteResult);
        }

        @Override // org.springframework.data.cassandra.core.cql.CqlProvider
        public String getCql() {
            return this.statement.getQuery();
        }

        private static Mono<WriteResult> toWriteResult(ReactiveResultSet reactiveResultSet) {
            return reactiveResultSet.rows().collectList().map(list -> {
                return new WriteResult(reactiveResultSet.getAllExecutionInfo(), reactiveResultSet.wasApplied(), list);
            });
        }

        public StatementCallback(@NonNull SimpleStatement simpleStatement) {
            if (simpleStatement == null) {
                throw new NullPointerException("statement is marked non-null but is null");
            }
            this.statement = simpleStatement;
        }

        @NonNull
        public SimpleStatement getStatement() {
            return this.statement;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof StatementCallback)) {
                return false;
            }
            SimpleStatement statement = getStatement();
            SimpleStatement statement2 = ((StatementCallback) obj).getStatement();
            return statement == null ? statement2 == null : statement.equals(statement2);
        }

        public int hashCode() {
            SimpleStatement statement = getStatement();
            return (1 * 59) + (statement == null ? 43 : statement.hashCode());
        }

        public String toString() {
            return "ReactiveCassandraTemplate.StatementCallback(statement=" + getStatement() + ")";
        }
    }

    public ReactiveCassandraTemplate(ReactiveSession reactiveSession) {
        this(reactiveSession, newConverter());
    }

    public ReactiveCassandraTemplate(ReactiveSession reactiveSession, CassandraConverter cassandraConverter) {
        this(new DefaultReactiveSessionFactory(reactiveSession), cassandraConverter);
    }

    public ReactiveCassandraTemplate(ReactiveSessionFactory reactiveSessionFactory, CassandraConverter cassandraConverter) {
        this(new ReactiveCqlTemplate(reactiveSessionFactory), cassandraConverter);
    }

    public ReactiveCassandraTemplate(ReactiveCqlOperations reactiveCqlOperations, CassandraConverter cassandraConverter) {
        Assert.notNull(reactiveCqlOperations, "ReactiveCqlOperations must not be null");
        Assert.notNull(cassandraConverter, "CassandraConverter must not be null");
        this.converter = cassandraConverter;
        this.cqlOperations = reactiveCqlOperations;
        this.entityOperations = new EntityOperations(cassandraConverter.mo22getMappingContext());
        this.projectionFactory = new SpelAwareProxyProjectionFactory();
        this.statementFactory = new StatementFactory(cassandraConverter);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public ReactiveCassandraBatchOperations batchOps() {
        return new ReactiveCassandraBatchTemplate(this);
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (this.entityCallbacks == null) {
            setEntityCallbacks(ReactiveEntityCallbacks.create(applicationContext));
        }
        this.projectionFactory.setBeanFactory(applicationContext);
        this.projectionFactory.setBeanClassLoader(applicationContext.getClassLoader());
    }

    public void setEntityCallbacks(@Nullable ReactiveEntityCallbacks reactiveEntityCallbacks) {
        this.entityCallbacks = reactiveEntityCallbacks;
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public CassandraConverter getConverter() {
        return this.converter;
    }

    protected EntityOperations getEntityOperations() {
        return this.entityOperations;
    }

    protected SpelAwareProxyProjectionFactory getProjectionFactory() {
        return this.projectionFactory;
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public ReactiveCqlOperations getReactiveCqlOperations() {
        return this.cqlOperations;
    }

    private CassandraPersistentEntity<?> getRequiredPersistentEntity(Class<?> cls) {
        return getEntityOperations().getRequiredPersistentEntity(cls);
    }

    protected StatementFactory getStatementFactory() {
        return this.statementFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CqlIdentifier getTableName(Class<?> cls) {
        return getRequiredPersistentEntity(cls).getTableName();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Flux<T> select(String str, Class<T> cls) {
        Assert.hasText(str, "CQL must not be empty");
        return select((Statement<?>) SimpleStatement.newInstance(str), cls);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<T> selectOne(String str, Class<T> cls) {
        return select(str, cls).next();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Flux<T> select(Statement<?> statement, Class<T> cls) {
        Assert.notNull(statement, "Statement must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        Function<Row, T> mapper = getMapper(cls, cls, EntityQueryUtils.getTableName(statement));
        return getReactiveCqlOperations().query(statement, (row, i) -> {
            return mapper.apply(row);
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<T> selectOne(Statement<?> statement, Class<T> cls) {
        return select(statement, cls).next();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<Slice<T>> slice(Statement<?> statement, Class<T> cls) {
        Assert.notNull(statement, "Statement must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        Mono<ReactiveResultSet> queryForResultSet = getReactiveCqlOperations().queryForResultSet(statement);
        Mono<Integer> effectiveFetchSize = getEffectiveFetchSize(statement);
        RowMapper rowMapper = (row, i) -> {
            return getConverter().read(cls, row);
        };
        return queryForResultSet.zipWith(effectiveFetchSize).flatMap(tuple2 -> {
            ReactiveResultSet reactiveResultSet = (ReactiveResultSet) tuple2.getT1();
            Integer num = (Integer) tuple2.getT2();
            return reactiveResultSet.availableRows().collectList().map(list -> {
                return EntityQueryUtils.readSlice(list, reactiveResultSet.getExecutionInfo().getPagingState(), rowMapper, 1, num.intValue());
            });
        }).defaultIfEmpty(new SliceImpl(Collections.emptyList()));
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Flux<T> select(Query query, Class<T> cls) throws DataAccessException {
        Assert.notNull(query, "Query must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return doSelect(query, cls, getTableName(cls), cls);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Flux<T> doSelect(Query query, Class<?> cls, CqlIdentifier cqlIdentifier, Class<T> cls2) {
        CassandraPersistentEntity<?> requiredPersistentEntity = getRequiredPersistentEntity(cls);
        StatementBuilder<Select> select = getStatementFactory().select(query.columns(getStatementFactory().computeColumnsForProjection(query.getColumns(), requiredPersistentEntity, cls2)), requiredPersistentEntity, cqlIdentifier);
        Function<Row, T> mapper = getMapper(cls, cls2, cqlIdentifier);
        return getReactiveCqlOperations().query((Statement<?>) select.build(), (row, i) -> {
            return mapper.apply(row);
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<T> selectOne(Query query, Class<T> cls) throws DataAccessException {
        Assert.notNull(query, "Query must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return select(query, cls).next();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<Slice<T>> slice(Query query, Class<T> cls) throws DataAccessException {
        Assert.notNull(query, "Query must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return slice((Statement<?>) getStatementFactory().select(query, getRequiredPersistentEntity(cls)).build(), cls);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Boolean> update(Query query, Update update, Class<?> cls) throws DataAccessException {
        Assert.notNull(query, "Query must not be null");
        Assert.notNull(update, "Update must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return doUpdate(query, update, cls, getTableName(cls)).map((v0) -> {
            return v0.wasApplied();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<WriteResult> doUpdate(Query query, Update update, Class<?> cls, CqlIdentifier cqlIdentifier) {
        return getReactiveCqlOperations().execute(new StatementCallback(getStatementFactory().update(query, update, getRequiredPersistentEntity(cls), cqlIdentifier).build())).next();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Boolean> delete(Query query, Class<?> cls) throws DataAccessException {
        Assert.notNull(query, "Query must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return doDelete(query, cls, getTableName(cls)).map((v0) -> {
            return v0.wasApplied();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<WriteResult> doDelete(Query query, Class<?> cls, CqlIdentifier cqlIdentifier) {
        SimpleStatement build = getStatementFactory().delete(query, getRequiredPersistentEntity(cls), cqlIdentifier).build();
        return getReactiveCqlOperations().execute(new StatementCallback(build)).doOnSubscribe(subscription -> {
            maybeEmitEvent(new BeforeDeleteEvent(build, cls, cqlIdentifier));
        }).next().doOnNext(writeResult -> {
            maybeEmitEvent(new AfterDeleteEvent(build, cls, cqlIdentifier));
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Long> count(Class<?> cls) {
        Assert.notNull(cls, "Entity type must not be null");
        return doCount(Query.empty(), cls, getTableName(cls));
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Long> count(Query query, Class<?> cls) throws DataAccessException {
        Assert.notNull(query, "Query must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return doCount(query, cls, getTableName(cls));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Long> doCount(Query query, Class<?> cls, CqlIdentifier cqlIdentifier) {
        return getReactiveCqlOperations().queryForObject((Statement<?>) getStatementFactory().count(query, getRequiredPersistentEntity(cls), cqlIdentifier).build(), Long.class).switchIfEmpty(Mono.just(0L));
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Boolean> exists(Object obj, Class<?> cls) {
        Assert.notNull(obj, "Id must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        CassandraPersistentEntity<?> requiredPersistentEntity = getRequiredPersistentEntity(cls);
        return getReactiveCqlOperations().queryForRows((Statement<?>) getStatementFactory().selectOneById(obj, requiredPersistentEntity, requiredPersistentEntity.getTableName()).build()).hasElements();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Boolean> exists(Query query, Class<?> cls) throws DataAccessException {
        Assert.notNull(query, "Query must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return doExists(query, cls, getTableName(cls));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Boolean> doExists(Query query, Class<?> cls, CqlIdentifier cqlIdentifier) {
        return getReactiveCqlOperations().queryForRows((Statement<?>) getStatementFactory().select(query.limit(1L), getRequiredPersistentEntity(cls), cqlIdentifier).build()).hasElements();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<T> selectOneById(Object obj, Class<T> cls) {
        Assert.notNull(obj, "Id must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        return selectOne((Statement<?>) getStatementFactory().selectOneById(obj, getRequiredPersistentEntity(cls), getTableName(cls)).build(), cls);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<T> insert(T t) {
        return insert(t, InsertOptions.empty()).map((v0) -> {
            return v0.getEntity();
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<EntityWriteResult<T>> insert(T t, InsertOptions insertOptions) {
        Assert.notNull(t, "Entity must not be null");
        Assert.notNull(insertOptions, "InsertOptions must not be null");
        return doInsert((ReactiveCassandraTemplate) t, (WriteOptions) insertOptions, getTableName(t.getClass()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Mono<EntityWriteResult<T>> doInsert(T t, WriteOptions writeOptions, CqlIdentifier cqlIdentifier) {
        return maybeCallBeforeConvert(t, cqlIdentifier).flatMap(obj -> {
            EntityOperations.AdaptibleEntity forEntity = this.entityOperations.forEntity(obj, getConverter().getConversionService());
            CassandraPersistentEntity<?> requiredPersistentEntity = getRequiredPersistentEntity(obj.getClass());
            Object initializeVersionProperty = forEntity.isVersionedEntity() ? forEntity.initializeVersionProperty() : obj;
            StatementBuilder<RegularInsert> insert = getStatementFactory().insert(initializeVersionProperty, writeOptions, requiredPersistentEntity, cqlIdentifier);
            if (!forEntity.isVersionedEntity()) {
                return doInsert(insert.build(), (SimpleStatement) initializeVersionProperty, cqlIdentifier);
            }
            insert.apply((v0) -> {
                return v0.ifNotExists();
            });
            return doInsertVersioned(insert.build(), initializeVersionProperty, forEntity, cqlIdentifier);
        });
    }

    private <T> Mono<EntityWriteResult<T>> doInsertVersioned(SimpleStatement simpleStatement, T t, EntityOperations.AdaptibleEntity<T> adaptibleEntity, CqlIdentifier cqlIdentifier) {
        return executeSave(t, cqlIdentifier, simpleStatement, (entityWriteResult, synchronousSink) -> {
            if (entityWriteResult.wasApplied()) {
                synchronousSink.next(entityWriteResult);
            } else {
                synchronousSink.error(new OptimisticLockingFailureException(String.format("Cannot insert entity %s with version %s into table %s as it already exists", t, adaptibleEntity.getVersion(), cqlIdentifier)));
            }
        });
    }

    private <T> Mono<EntityWriteResult<T>> doInsert(SimpleStatement simpleStatement, T t, CqlIdentifier cqlIdentifier) {
        return executeSave(t, cqlIdentifier, simpleStatement);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<T> update(T t) {
        return update(t, UpdateOptions.empty()).map((v0) -> {
            return v0.getEntity();
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<EntityWriteResult<T>> update(T t, UpdateOptions updateOptions) {
        Assert.notNull(t, "Entity must not be null");
        Assert.notNull(updateOptions, "UpdateOptions must not be null");
        EntityOperations.AdaptibleEntity<T> forEntity = this.entityOperations.forEntity(t, getConverter().getConversionService());
        CassandraPersistentEntity<?> requiredPersistentEntity = getRequiredPersistentEntity(t.getClass());
        CqlIdentifier tableName = requiredPersistentEntity.getTableName();
        return maybeCallBeforeConvert(t, tableName).flatMap(obj -> {
            return forEntity.isVersionedEntity() ? doUpdateVersioned(t, updateOptions, tableName, requiredPersistentEntity) : doUpdate((ReactiveCassandraTemplate) t, updateOptions, tableName, (CassandraPersistentEntity<?>) requiredPersistentEntity);
        });
    }

    private <T> Mono<EntityWriteResult<T>> doUpdateVersioned(T t, UpdateOptions updateOptions, CqlIdentifier cqlIdentifier, CassandraPersistentEntity<?> cassandraPersistentEntity) {
        EntityOperations.AdaptibleEntity<T> forEntity = getEntityOperations().forEntity(t, getConverter().getConversionService());
        Number version = forEntity.getVersion();
        T incrementVersion = forEntity.incrementVersion();
        return executeSave(incrementVersion, cqlIdentifier, forEntity.appendVersionCondition(getStatementFactory().update(incrementVersion, updateOptions, cassandraPersistentEntity, cqlIdentifier), version).build(), (entityWriteResult, synchronousSink) -> {
            if (entityWriteResult.wasApplied()) {
                synchronousSink.next(entityWriteResult);
            } else {
                synchronousSink.error(new OptimisticLockingFailureException(String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", incrementVersion, forEntity.getVersion(), cqlIdentifier)));
            }
        });
    }

    private <T> Mono<EntityWriteResult<T>> doUpdate(T t, UpdateOptions updateOptions, CqlIdentifier cqlIdentifier, CassandraPersistentEntity<?> cassandraPersistentEntity) {
        return executeSave(t, cqlIdentifier, getStatementFactory().update(t, updateOptions, cassandraPersistentEntity, cqlIdentifier).build());
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public <T> Mono<T> delete(T t) {
        return delete(t, QueryOptions.empty()).map(writeResult -> {
            return t;
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<WriteResult> delete(Object obj, QueryOptions queryOptions) {
        Assert.notNull(obj, "Entity must not be null");
        Assert.notNull(queryOptions, "QueryOptions must not be null");
        EntityOperations.AdaptibleEntity<Object> forEntity = this.entityOperations.forEntity(obj, getConverter().getConversionService());
        CqlIdentifier tableName = getRequiredPersistentEntity(obj.getClass()).getTableName();
        StatementBuilder<Delete> delete = getStatementFactory().delete(obj, queryOptions, getConverter(), tableName);
        return forEntity.isVersionedEntity() ? doDeleteVersioned(forEntity.appendVersionCondition(delete).build(), obj, forEntity, tableName) : doDelete(delete.build(), obj, tableName);
    }

    private Mono<WriteResult> doDeleteVersioned(SimpleStatement simpleStatement, Object obj, EntityOperations.AdaptibleEntity<Object> adaptibleEntity, CqlIdentifier cqlIdentifier) {
        return executeDelete(obj, cqlIdentifier, simpleStatement, (writeResult, synchronousSink) -> {
            if (writeResult.wasApplied()) {
                synchronousSink.next(writeResult);
            } else {
                synchronousSink.error(new OptimisticLockingFailureException(String.format("Cannot delete entity %s with version %s in table %s. Has it been modified meanwhile?", obj, adaptibleEntity.getVersion(), cqlIdentifier)));
            }
        });
    }

    private Mono<WriteResult> doDelete(SimpleStatement simpleStatement, Object obj, CqlIdentifier cqlIdentifier) {
        return executeDelete(obj, cqlIdentifier, simpleStatement, (writeResult, synchronousSink) -> {
            synchronousSink.next(writeResult);
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Boolean> deleteById(Object obj, Class<?> cls) {
        Assert.notNull(obj, "Id must not be null");
        Assert.notNull(cls, "Entity type must not be null");
        CassandraPersistentEntity<?> requiredPersistentEntity = getRequiredPersistentEntity(cls);
        CqlIdentifier tableName = requiredPersistentEntity.getTableName();
        SimpleStatement build = getStatementFactory().deleteById(obj, requiredPersistentEntity, tableName).build();
        return getReactiveCqlOperations().execute((Statement<?>) build).doOnSubscribe(subscription -> {
            maybeEmitEvent(new BeforeDeleteEvent(build, cls, tableName));
        }).doOnNext(bool -> {
            maybeEmitEvent(new AfterDeleteEvent(build, cls, tableName));
        });
    }

    @Override // org.springframework.data.cassandra.core.ReactiveCassandraOperations
    public Mono<Void> truncate(Class<?> cls) {
        Assert.notNull(cls, "Entity type must not be null");
        CqlIdentifier tableName = getTableName(cls);
        SimpleStatement build = QueryBuilder.truncate(tableName).build();
        return getReactiveCqlOperations().execute((Statement<?>) build).doOnSubscribe(subscription -> {
            maybeEmitEvent(new BeforeDeleteEvent(build, cls, tableName));
        }).doOnNext(bool -> {
            maybeEmitEvent(new AfterDeleteEvent(build, cls, tableName));
        }).then();
    }

    @Override // org.springframework.data.cassandra.core.ReactiveDeleteOperation
    public ReactiveDeleteOperation.ReactiveDelete delete(Class<?> cls) {
        return new ReactiveDeleteOperationSupport(this).delete(cls);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveInsertOperation
    public <T> ReactiveInsertOperation.ReactiveInsert<T> insert(Class<T> cls) {
        return new ReactiveInsertOperationSupport(this).insert(cls);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveSelectOperation
    public <T> ReactiveSelectOperation.ReactiveSelect<T> query(Class<T> cls) {
        return new ReactiveSelectOperationSupport(this).query(cls);
    }

    @Override // org.springframework.data.cassandra.core.ReactiveUpdateOperation
    public ReactiveUpdateOperation.ReactiveUpdate update(Class<?> cls) {
        return new ReactiveUpdateOperationSupport(this).update(cls);
    }

    private <T> Mono<EntityWriteResult<T>> executeSave(T t, CqlIdentifier cqlIdentifier, SimpleStatement simpleStatement) {
        return executeSave(t, cqlIdentifier, simpleStatement, (entityWriteResult, synchronousSink) -> {
            synchronousSink.next(entityWriteResult);
        });
    }

    private <T> Mono<EntityWriteResult<T>> executeSave(T t, CqlIdentifier cqlIdentifier, SimpleStatement simpleStatement, BiConsumer<EntityWriteResult<T>, SynchronousSink<EntityWriteResult<T>>> biConsumer) {
        return Mono.defer(() -> {
            maybeEmitEvent(new BeforeSaveEvent(t, cqlIdentifier, simpleStatement));
            return maybeCallBeforeSave(t, cqlIdentifier, simpleStatement).flatMapMany(obj -> {
                return getReactiveCqlOperations().execute(new StatementCallback(simpleStatement)).map(writeResult -> {
                    return EntityWriteResult.of(writeResult, obj);
                }).handle(biConsumer).doOnNext(entityWriteResult -> {
                    maybeEmitEvent(new AfterSaveEvent(obj, cqlIdentifier));
                });
            }).next();
        });
    }

    private Mono<WriteResult> executeDelete(Object obj, CqlIdentifier cqlIdentifier, SimpleStatement simpleStatement, BiConsumer<WriteResult, SynchronousSink<WriteResult>> biConsumer) {
        maybeEmitEvent(new BeforeDeleteEvent(simpleStatement, obj.getClass(), cqlIdentifier));
        return getReactiveCqlOperations().execute(new StatementCallback(simpleStatement)).map(writeResult -> {
            return EntityWriteResult.of(writeResult, obj);
        }).handle(biConsumer).doOnSubscribe(subscription -> {
            maybeEmitEvent(new BeforeSaveEvent(obj, cqlIdentifier, simpleStatement));
        }).doOnNext(writeResult2 -> {
            maybeEmitEvent(new AfterDeleteEvent(simpleStatement, obj.getClass(), cqlIdentifier));
        }).next();
    }

    private int getConfiguredPageSize(DriverContext driverContext) {
        return driverContext.getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000);
    }

    private Mono<Integer> getEffectiveFetchSize(Statement<?> statement) {
        if (statement.getPageSize() > 0) {
            return Mono.just(Integer.valueOf(statement.getPageSize()));
        }
        if (getReactiveCqlOperations() instanceof CassandraAccessor) {
            CassandraAccessor cassandraAccessor = (CassandraAccessor) getReactiveCqlOperations();
            if (cassandraAccessor.getFetchSize() != -1) {
                return Mono.just(Integer.valueOf(cassandraAccessor.getFetchSize()));
            }
        }
        return getReactiveCqlOperations().execute(reactiveSession -> {
            return Mono.just(Integer.valueOf(getConfiguredPageSize(reactiveSession.getContext())));
        }).single();
    }

    private <T> Function<Row, T> getMapper(Class<?> cls, Class<T> cls2, CqlIdentifier cqlIdentifier) {
        Class<?> resolveTypeToRead = resolveTypeToRead(cls, cls2);
        return row -> {
            maybeEmitEvent(new AfterLoadEvent(row, cls2, cqlIdentifier));
            Object read = getConverter().read(resolveTypeToRead, row);
            Object createProjection = cls2.isInterface() ? getProjectionFactory().createProjection(cls2, read) : read;
            if (createProjection != null) {
                maybeEmitEvent(new AfterConvertEvent(row, createProjection, cqlIdentifier));
            }
            return createProjection;
        };
    }

    private Class<?> resolveTypeToRead(Class<?> cls, Class<?> cls2) {
        return (cls2.isInterface() || cls2.isAssignableFrom(cls)) ? cls : cls2;
    }

    private static MappingCassandraConverter newConverter() {
        MappingCassandraConverter mappingCassandraConverter = new MappingCassandraConverter();
        mappingCassandraConverter.afterPropertiesSet();
        return mappingCassandraConverter;
    }

    protected <E extends CassandraMappingEvent<T>, T> void maybeEmitEvent(E e) {
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent(e);
        }
    }

    protected <T> Mono<T> maybeCallBeforeConvert(T t, CqlIdentifier cqlIdentifier) {
        return null != this.entityCallbacks ? this.entityCallbacks.callback(ReactiveBeforeConvertCallback.class, t, new Object[]{cqlIdentifier}) : Mono.just(t);
    }

    protected <T> Mono<T> maybeCallBeforeSave(T t, CqlIdentifier cqlIdentifier, Statement<?> statement) {
        return null != this.entityCallbacks ? this.entityCallbacks.callback(ReactiveBeforeSaveCallback.class, t, new Object[]{cqlIdentifier, statement}) : Mono.just(t);
    }
}
