package ch.voulgarakis.spring.boot.starter.quickfixj.flux;

import ch.voulgarakis.spring.boot.starter.quickfixj.session.FixSessionManager;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import quickfix.Message;
import quickfix.SessionID;
import quickfix.field.QuoteID;
import quickfix.field.QuoteReqID;
import quickfix.fix43.Quote;
import quickfix.fix43.QuoteRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

@ExtendWith({SpringExtension.class})
@SpringBootTest(classes = {ReactiveFixSessionTestContext.class})
@DirtiesContext
@TestPropertySource({"classpath:fixSessionTest.properties"})
/* loaded from: input_file:ch/voulgarakis/spring/boot/starter/quickfixj/flux/ReactiveFixSessionTest.class */
public class ReactiveFixSessionTest {
    private static final Logger LOG = LoggerFactory.getLogger(ReactiveFixSessionTest.class);
    private static final SessionID SESSION_ID = new SessionID("FIX.4.3", "TEST_CLIENT2", "FIX");

    @Autowired
    private FixSessionManager sessionManager;

    @Autowired
    @Qualifier("TEST2")
    private ReactiveFixSessionImpl fixSession;

    @Test
    public void testBurstSubscription() {
        QuoteRequest quoteRequest = new QuoteRequest(new QuoteReqID(Long.toString(0L)));
        StepVerifier.create(this.fixSession.sendAndSubscribe(() -> {
            return quoteRequest;
        }).doOnSubscribe(subscription -> {
            burstOf20QuotesWithDifferentIdEvery100millis();
        })).expectSubscription().expectNoEvent(Duration.ofMillis(100L)).expectNextCount(20L).thenAwait(Duration.ofMillis(200L)).expectNextCount(20L).thenCancel().verify(Duration.ofSeconds(2L));
    }

    private void burstOf20QuotesWithDifferentIdEvery100millis() {
        Flux.interval(Duration.ofMillis(100L)).take(2L).repeat(1L).flatMap(l -> {
            LOG.debug("tick: {}", l);
            Quote quote = new Quote(new QuoteID(Long.toString(l.longValue())));
            quote.set(new QuoteReqID(Long.toString(l.longValue())));
            return Flux.just(quote).repeat(19L).map((v0) -> {
                return v0.clone();
            });
        }).parallel().runOn(Schedulers.elastic()).subscribe(obj -> {
            LOG.debug("Sending to Session Manager: {}", obj);
            this.sessionManager.fromApp((Message) obj, SESSION_ID);
        });
        LOG.info("Sending burst of quotes");
    }

    @Test
    public void testRandomSubscription() {
        List list = (List) new Random().longs(1000L, 0L, 5L).boxed().collect(Collectors.toList());
        long count = list.stream().filter(l -> {
            return l.longValue() == 0;
        }).count();
        LOG.info("Random quotes: {}", Long.valueOf(count));
        AtomicInteger atomicInteger = new AtomicInteger();
        QuoteRequest quoteRequest = new QuoteRequest(new QuoteReqID(Long.toString(0L)));
        StepVerifier.create(this.fixSession.sendAndSubscribe(() -> {
            return quoteRequest;
        }).doOnSubscribe(subscription -> {
            randomQuotes(list);
        }).doOnNext(message -> {
            LOG.info("Progress: {}/{}", Integer.valueOf(atomicInteger.incrementAndGet()), Long.valueOf(count));
        })).expectSubscription().expectNextCount(count).thenCancel().verify(Duration.ofSeconds(10L));
    }

    private void randomQuotes(List<Long> list) {
        Mono.delay(Duration.ofMillis(100L)).thenMany(Flux.fromIterable(list).map(l -> {
            LOG.debug("tick: {}", l);
            Quote quote = new Quote(new QuoteID(Long.toString(l.longValue())));
            quote.set(new QuoteReqID(Long.toString(l.longValue())));
            return quote;
        }).parallel().runOn(Schedulers.elastic())).subscribe(quote -> {
            LOG.debug("Sending to Session Manager: {}", quote);
            this.sessionManager.fromApp(quote, SESSION_ID);
        });
        LOG.info("Sending random quotes");
    }
}
