/*
 * Decompiled with CFR 0.152.
 */
package de.quantummaid.eventmaid.internal.pipe;

import de.quantummaid.eventmaid.exceptions.AlreadyClosedException;
import de.quantummaid.eventmaid.internal.exceptions.BubbleUpWrappedException;
import de.quantummaid.eventmaid.internal.pipe.Pipe;
import de.quantummaid.eventmaid.internal.pipe.PipeStatusInformation;
import de.quantummaid.eventmaid.internal.pipe.exceptions.PipeErrorHandler;
import de.quantummaid.eventmaid.internal.pipe.statistics.PipeStatistics;
import de.quantummaid.eventmaid.internal.pipe.statistics.PipeStatisticsCollector;
import de.quantummaid.eventmaid.internal.pipe.transport.TransportMechanism;
import de.quantummaid.eventmaid.subscribing.ConsumerSubscriber;
import de.quantummaid.eventmaid.subscribing.Subscriber;
import de.quantummaid.eventmaid.subscribing.SubscriptionId;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public final class PipeImpl<T>
implements Pipe<T> {
    private final TransportMechanism<T> transportMechanism;
    private final PipeStatisticsCollector statisticsCollector;
    private final List<Subscriber<T>> subscribers;
    private final PipeErrorHandler<T> errorHandler;
    private volatile boolean closedAlreadyCalled;

    public PipeImpl(TransportMechanism<T> transportMechanism, PipeStatisticsCollector statisticsCollector, List<Subscriber<T>> subscribers, PipeErrorHandler<T> errorHandler) {
        this.transportMechanism = transportMechanism;
        this.statisticsCollector = statisticsCollector;
        this.subscribers = subscribers;
        this.errorHandler = errorHandler;
    }

    @Override
    public void send(T message) {
        if (this.closedAlreadyCalled) {
            throw new AlreadyClosedException();
        }
        this.transport(message);
    }

    private void transport(T message) {
        try {
            this.transportMechanism.transport(message);
        }
        catch (BubbleUpWrappedException e) {
            this.errorHandler.handleBubbledUpException(message, e);
        }
    }

    @Override
    public SubscriptionId subscribe(Subscriber<T> subscriber) {
        if (!this.closedAlreadyCalled) {
            this.subscribers.add(subscriber);
            return subscriber.getSubscriptionId();
        }
        throw new AlreadyClosedException();
    }

    @Override
    public SubscriptionId subscribe(Consumer<T> consumer) {
        ConsumerSubscriber<T> subscriber = ConsumerSubscriber.consumerSubscriber(consumer);
        return this.subscribe(subscriber);
    }

    @Override
    public void unsubscribe(SubscriptionId subscriptionId) {
        if (this.closedAlreadyCalled) {
            throw new AlreadyClosedException();
        }
        this.subscribers.removeIf(subscriber -> subscriber.getSubscriptionId().equals(subscriptionId));
    }

    @Override
    public PipeStatusInformation<T> getStatusInformation() {
        return new PipeStatusInformation<T>(){

            @Override
            public PipeStatistics getCurrentMessageStatistics() {
                return PipeImpl.this.statisticsCollector.getCurrentStatistics();
            }

            @Override
            public List<Subscriber<T>> getAllSubscribers() {
                return PipeImpl.this.subscribers;
            }
        };
    }

    @Override
    public void close(boolean finishRemainingTasks) {
        if (!this.closedAlreadyCalled) {
            this.closedAlreadyCalled = true;
            this.transportMechanism.close(finishRemainingTasks);
        }
    }

    @Override
    public void close() {
        this.close(true);
    }

    @Override
    public boolean isClosed() {
        if (!this.closedAlreadyCalled) {
            return false;
        }
        return this.transportMechanism.isShutdown();
    }

    @Override
    public boolean awaitTermination(int timeout, TimeUnit timeUnit) throws InterruptedException {
        if (!this.closedAlreadyCalled) {
            return false;
        }
        return this.transportMechanism.awaitTermination(timeout, timeUnit);
    }
}

