package ch.voulgarakis.spring.boot.starter.quickfixj.flux;

import ch.voulgarakis.spring.boot.starter.quickfixj.exception.QuickFixJException;
import ch.voulgarakis.spring.boot.starter.quickfixj.exception.SessionException;
import ch.voulgarakis.spring.boot.starter.quickfixj.session.AbstractFixSession;
import ch.voulgarakis.spring.boot.starter.quickfixj.session.FixSessionUtils;
import ch.voulgarakis.spring.boot.starter.quickfixj.session.MessageSink;
import ch.voulgarakis.spring.boot.starter.quickfixj.session.utils.RefIdSelector;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionNotFound;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:ch/voulgarakis/spring/boot/starter/quickfixj/flux/ReactiveFixSessionImpl.class */
public class ReactiveFixSessionImpl extends AbstractFixSession implements ReactiveFixSession {
    private Counter messagesReceived;
    private Counter messagesSent;
    private Counter rejections;

    public ReactiveFixSessionImpl() {
    }

    public ReactiveFixSessionImpl(SessionID sessionID) {
        super(sessionID);
    }

    @Autowired(required = false)
    public void setMeterRegistry(MeterRegistry meterRegistry) {
        String extractFixSessionName = FixSessionUtils.extractFixSessionName(this);
        if (StringUtils.isBlank(extractFixSessionName)) {
            return;
        }
        Gauge.builder("quickfixj.flux.connection", () -> {
            return Integer.valueOf(isLoggedOn() ? 1 : 0);
        }).description("Connection state of reactive fix session").tag("fixSessionName", extractFixSessionName).register(meterRegistry);
        Gauge.builder("quickfixj.flux.subscribers", this::sinkSize).description("Number of subscribers on reactive fix session").tag("fixSessionName", extractFixSessionName).register(meterRegistry);
        this.messagesReceived = Counter.builder("quickfixj.flux.messages.received").description("Number of received FIX messages on reactive fix session").tag("fixSessionName", extractFixSessionName).baseUnit("messages").register(meterRegistry);
        this.messagesSent = Counter.builder("quickfixj.flux.messages.sent").description("Number of sent FIX messages on reactive fix session").tag("fixSessionName", extractFixSessionName).baseUnit("messages").register(meterRegistry);
        this.rejections = Counter.builder("quickfixj.flux.rejections").description("Number of received FIX rejections on reactive fix session").tag("fixSessionName", extractFixSessionName).baseUnit("rejects").register(meterRegistry);
    }

    protected void received(Message message) {
        if (Objects.nonNull(this.messagesReceived)) {
            this.messagesReceived.increment();
        }
        super.received(message);
    }

    protected void error(SessionException sessionException) {
        if (Objects.nonNull(this.rejections)) {
            this.rejections.increment();
        }
        super.error(sessionException);
    }

    @Override // ch.voulgarakis.spring.boot.starter.quickfixj.flux.ReactiveFixSession
    public Flux<Message> subscribe(Predicate<Message> predicate) {
        EmitterProcessor create = EmitterProcessor.create();
        FluxSink sink = create.sink();
        Objects.requireNonNull(sink);
        Consumer consumer = (v1) -> {
            r2.next(v1);
        };
        Objects.requireNonNull(sink);
        MessageSink createSink = createSink(predicate, consumer, sink::error);
        Objects.requireNonNull(createSink);
        sink.onDispose(createSink::dispose);
        return create.onBackpressureLatest();
    }

    @Override // ch.voulgarakis.spring.boot.starter.quickfixj.flux.ReactiveFixSession
    public Mono<Message> send(Supplier<Message> supplier) {
        return Mono.defer(() -> {
            try {
                Message message = (Message) supplier.get();
                Session.sendToTarget(message, getSessionId());
                if (Objects.nonNull(this.messagesSent)) {
                    this.messagesSent.increment();
                }
                return Mono.just(message);
            } catch (SessionNotFound e) {
                if (Objects.nonNull(this.rejections)) {
                    this.rejections.increment();
                }
                return Mono.error(new QuickFixJException(e));
            }
        }).metrics();
    }

    @Override // ch.voulgarakis.spring.boot.starter.quickfixj.flux.ReactiveFixSession
    public Flux<Message> sendAndSubscribe(Supplier<Message> supplier, Function<Message, RefIdSelector> function) {
        return send(supplier).flatMapMany(message -> {
            return subscribe((RefIdSelector) function.apply(message));
        }).metrics();
    }
}
