package ch.voulgarakis.spring.boot.starter.quickfixj.session;

import ch.voulgarakis.spring.boot.starter.quickfixj.FixSessionInterface;
import ch.voulgarakis.spring.boot.starter.quickfixj.exception.QuickFixJConfigurationException;
import ch.voulgarakis.spring.boot.starter.quickfixj.exception.SessionDroppedException;
import ch.voulgarakis.spring.boot.starter.quickfixj.exception.SessionException;
import ch.voulgarakis.spring.boot.starter.quickfixj.session.utils.FixMessageUtils;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;

/* loaded from: input_file:ch/voulgarakis/spring/boot/starter/quickfixj/session/AbstractFixSession.class */
public class AbstractFixSession implements FixSessionInterface {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFixSession.class);
    private final Set<MessageSink> sinks = ConcurrentHashMap.newKeySet();
    private final AtomicReference<SessionDroppedException> loggedOut = new AtomicReference<>();
    private SessionID sessionId;
    private String sessionName;

    public AbstractFixSession() {
    }

    public AbstractFixSession(SessionID sessionID) {
        this.sessionId = sessionID;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void received(Message message) {
        notifySubscribers(message, messageSink -> {
            messageSink.next(message);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void error(SessionException sessionException) {
        loggedOut(sessionException);
        notifySubscribers(sessionException.getFixMessage(), messageSink -> {
            messageSink.error(sessionException);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void loggedOn() {
        loggedOut(null);
    }

    private void loggedOut(SessionException sessionException) {
        if (!(sessionException instanceof SessionDroppedException)) {
            this.loggedOut.set(null);
            return;
        }
        SessionDroppedException sessionDroppedException = (SessionDroppedException) sessionException;
        if (Objects.nonNull(sessionException.getFixMessage())) {
            this.loggedOut.set(sessionDroppedException);
        } else {
            this.loggedOut.compareAndSet(null, sessionDroppedException);
        }
    }

    private synchronized void notifySubscribers(Message message, Consumer<MessageSink> consumer) {
        int sum = this.sinks.parallelStream().mapToInt(messageSink -> {
            boolean test = messageSink.getMessageSelector().test(message);
            if (test) {
                consumer.accept(messageSink);
            }
            return test ? 1 : 0;
        }).sum();
        if (!Objects.nonNull(message) || FixMessageUtils.isMessageOfType(message, "5")) {
            return;
        }
        if (sum == 0) {
            LOG.warn("Message received could not be associated with any Request. Message: {}", message);
        } else if (sum > 1) {
            LOG.warn("Message received was  associated with {} Requests. Suspicious subscriptions. Message: {}", Integer.valueOf(sum), message);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Notified sink for message: {}", message);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageSink createSink(Predicate<Message> predicate, Consumer<Message> consumer, Consumer<Throwable> consumer2) {
        MessageSink messageSink = new MessageSink(this.sinks, predicate, consumer, consumer2);
        SessionDroppedException sessionDroppedException = this.loggedOut.get();
        if (Objects.nonNull(sessionDroppedException)) {
            messageSink.error(sessionDroppedException);
        }
        return messageSink;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void setSessionId(SessionID sessionID) {
        if (Objects.isNull(this.sessionId)) {
            this.sessionId = sessionID;
        } else if (!Objects.equals(sessionID, this.sessionId)) {
            throw new QuickFixJConfigurationException("Not allowed to set SessionId more than once.");
        }
    }

    @Override // ch.voulgarakis.spring.boot.starter.quickfixj.FixSessionInterface
    public SessionID getSessionId() {
        if (Objects.nonNull(this.sessionId)) {
            return this.sessionId;
        }
        throw new QuickFixJConfigurationException("SessionId is not set.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void setSessionName(String str) {
        if (Objects.isNull(this.sessionName)) {
            this.sessionName = str;
        } else if (!Objects.equals(str, this.sessionName)) {
            throw new QuickFixJConfigurationException("Not allowed to set SessionName more than once.");
        }
    }

    @Override // ch.voulgarakis.spring.boot.starter.quickfixj.FixSessionInterface
    public String getSessionName() {
        if (Objects.nonNull(this.sessionId)) {
            return this.sessionName;
        }
        throw new QuickFixJConfigurationException("SessionName is not set.");
    }

    @Override // ch.voulgarakis.spring.boot.starter.quickfixj.FixSessionInterface
    public boolean isLoggedOn() {
        Session lookupSession = Session.lookupSession(getSessionId());
        if (Objects.nonNull(lookupSession)) {
            return lookupSession.isLoggedOn();
        }
        throw new QuickFixJConfigurationException("Session does not exist.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int sinkSize() {
        return this.sinks.size();
    }
}
