package pl.allegro.tech.hermes.mock;

import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.core.ConditionTimeoutException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.Schema;

/* loaded from: input_file:pl/allegro/tech/hermes/mock/HermesMockExpect.class */
public class HermesMockExpect {
    private final HermesMockHelper hermesMockHelper;
    private final HermesMockQuery hermesMockQuery;
    private final int awaitSeconds;

    public HermesMockExpect(HermesMockHelper hermesMockHelper, int i) {
        this.hermesMockHelper = hermesMockHelper;
        this.hermesMockQuery = new HermesMockQuery(hermesMockHelper);
        this.awaitSeconds = i;
    }

    public void singleMessageOnTopic(String str) {
        messagesOnTopic(str, 1);
    }

    public <T> void singleJsonMessageOnTopicAs(String str, Class<T> cls) {
        jsonMessagesOnTopicAs(str, 1, cls);
    }

    public void singleAvroMessageOnTopic(String str, Schema schema) {
        avroMessagesOnTopic(str, 1, schema);
    }

    public void messagesOnTopic(String str, int i) {
        expectMessages(str, i);
    }

    public <T> void jsonMessagesOnTopicAs(String str, int i, Class<T> cls) {
        assertMessages(str, i, () -> {
            return this.hermesMockQuery.allJsonMessagesAs(str, cls);
        });
    }

    public <T> void jsonMessagesOnTopicAs(String str, int i, Class<T> cls, Predicate<T> predicate) {
        assertMessages(str, i, () -> {
            return this.hermesMockQuery.matchingJsonMessagesAs(str, cls, predicate);
        });
    }

    public void avroMessagesOnTopic(String str, int i, Schema schema) {
        assertMessages(str, i, () -> {
            return validateAvroMessages(str, schema);
        });
    }

    public <T> void avroMessagesOnTopic(String str, int i, Schema schema, Class<T> cls, Predicate<T> predicate) {
        assertMessages(str, i, () -> {
            return validateAvroMessages(str, schema, cls, predicate);
        });
    }

    private <T> void assertMessages(String str, int i, Supplier<List<T>> supplier) {
        expectMessages(str, i);
        expectSpecificMessages(i, supplier.get());
    }

    private void expectMessages(String str, int i) {
        try {
            Awaitility.await().atMost(this.awaitSeconds, TimeUnit.SECONDS).until(() -> {
                this.hermesMockHelper.verifyRequest(i, str);
            });
        } catch (ConditionTimeoutException e) {
            throw new HermesMockException("Hermes mock did not receive " + i + " messages.", e);
        }
    }

    private <T> void expectSpecificMessages(int i, List<T> list) {
        if (list != null && list.size() != i) {
            throw new HermesMockException("Hermes mock did not receive " + i + " messages, got " + list.size());
        }
    }

    private List<byte[]> validateAvroMessages(String str, Schema schema) {
        return (List) this.hermesMockQuery.allAvroRawMessages(str).stream().peek(bArr -> {
            this.hermesMockHelper.validateAvroSchema(bArr, schema);
        }).collect(Collectors.toList());
    }

    private <T> List<T> validateAvroMessages(String str, Schema schema, Class<T> cls, Predicate<T> predicate) {
        return (List) this.hermesMockQuery.allAvroRawMessages(str).stream().map(bArr -> {
            return this.hermesMockHelper.deserializeAvro(bArr, schema, cls);
        }).filter(predicate).collect(Collectors.toList());
    }
}
