package net.thisptr.flume.plugins.channel.synchronous;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.conf.TransactionCapacitySupported;
import org.apache.flume.instrumentation.ChannelCounter;

/* loaded from: input_file:net/thisptr/flume/plugins/channel/synchronous/SynchronousChannel.class */
public class SynchronousChannel extends BasicChannelSemantics implements TransactionCapacitySupported {
    private ChannelCounter channelCounter;
    private static final long DEFAULT_TAKE_TIMEOUT_MILLIS = 1000;
    private volatile long takeTimeoutMillis = DEFAULT_TAKE_TIMEOUT_MILLIS;
    private final LinkedList<SynchronousTransaction> txWithPendingEvents = new LinkedList<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/thisptr/flume/plugins/channel/synchronous/SynchronousChannel$PendingEvent.class */
    public static class PendingEvent {
        private final Event event;
        private final SynchronousTransaction putTx;

        public PendingEvent(Event event, SynchronousTransaction synchronousTransaction) {
            this.event = event;
            this.putTx = synchronousTransaction;
        }
    }

    /* loaded from: input_file:net/thisptr/flume/plugins/channel/synchronous/SynchronousChannel$SynchronousTransaction.class */
    private class SynchronousTransaction extends BasicTransactionSemantics {
        private final LinkedList<PendingEvent> putList;
        private final LinkedList<PendingEvent> takeList;
        private final Set<SynchronousTransaction> dependencyTxs;

        private SynchronousTransaction() {
            this.putList = new LinkedList<>();
            this.takeList = new LinkedList<>();
            this.dependencyTxs = new HashSet();
        }

        protected void doPut(Event event) {
            SynchronousChannel.this.channelCounter.incrementEventPutAttemptCount();
            synchronized (this) {
                this.putList.addLast(new PendingEvent(event, this));
            }
        }

        /* JADX WARN: Code restructure failed: missing block: B:54:0x00e0, code lost:
        
            r0 = r0.putList.removeFirst();
         */
        /* JADX WARN: Code restructure failed: missing block: B:55:0x00f5, code lost:
        
            if (r0.putList.isEmpty() == false) goto L68;
         */
        /* JADX WARN: Code restructure failed: missing block: B:56:0x00f8, code lost:
        
            r0 = r6.this$0.txWithPendingEvents;
         */
        /* JADX WARN: Code restructure failed: missing block: B:57:0x0102, code lost:
        
            monitor-enter(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:59:0x0103, code lost:
        
            r6.this$0.txWithPendingEvents.remove(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:60:0x0112, code lost:
        
            monitor-exit(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:68:0x011e, code lost:
        
            r0.dependencyTxs.add(r6);
         */
        /* JADX WARN: Code restructure failed: missing block: B:72:0x013f, code lost:
        
            monitor-enter(r6);
         */
        /* JADX WARN: Code restructure failed: missing block: B:74:0x0140, code lost:
        
            r6.takeList.addLast(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:75:0x014b, code lost:
        
            monitor-exit(r6);
         */
        /* JADX WARN: Code restructure failed: missing block: B:78:0x015c, code lost:
        
            return r0.event;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        protected org.apache.flume.Event doTake() throws java.lang.InterruptedException {
            /*
                Method dump skipped, instructions count: 349
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: net.thisptr.flume.plugins.channel.synchronous.SynchronousChannel.SynchronousTransaction.doTake():org.apache.flume.Event");
        }

        protected void doCommit() throws InterruptedException {
            synchronized (this) {
                if (!this.takeList.isEmpty()) {
                    for (SynchronousTransaction synchronousTransaction : (Set) this.takeList.stream().map(pendingEvent -> {
                        return pendingEvent.putTx;
                    }).collect(Collectors.toSet())) {
                        synchronized (synchronousTransaction) {
                            synchronousTransaction.dependencyTxs.remove(this);
                            if (synchronousTransaction.dependencyTxs.isEmpty()) {
                                synchronousTransaction.notify();
                            }
                        }
                    }
                    SynchronousChannel.this.channelCounter.addToEventPutSuccessCount(this.takeList.size());
                    SynchronousChannel.this.channelCounter.addToEventTakeSuccessCount(this.takeList.size());
                    this.takeList.clear();
                }
                if (!this.putList.isEmpty()) {
                    synchronized (SynchronousChannel.this.txWithPendingEvents) {
                        if (SynchronousChannel.this.txWithPendingEvents.add(this)) {
                            SynchronousChannel.this.txWithPendingEvents.notifyAll();
                        }
                    }
                    while (true) {
                        if (this.putList.isEmpty() && this.dependencyTxs.isEmpty()) {
                            break;
                        } else {
                            wait();
                        }
                    }
                }
            }
        }

        protected void doRollback() {
            synchronized (this) {
                if (!this.takeList.isEmpty()) {
                    ((Map) this.takeList.stream().collect(Collectors.toMap(pendingEvent -> {
                        return pendingEvent.putTx;
                    }, pendingEvent2 -> {
                        return new ArrayList(Collections.singletonList(pendingEvent2));
                    }, (list, list2) -> {
                        list.addAll(list2);
                        return list;
                    }))).forEach((synchronousTransaction, list3) -> {
                        synchronized (synchronousTransaction) {
                            Iterator it = list3.iterator();
                            while (it.hasNext()) {
                                synchronousTransaction.putList.addFirst((PendingEvent) it.next());
                            }
                            synchronousTransaction.dependencyTxs.remove(this);
                        }
                        synchronized (SynchronousChannel.this.txWithPendingEvents) {
                            if (SynchronousChannel.this.txWithPendingEvents.add(synchronousTransaction)) {
                                SynchronousChannel.this.txWithPendingEvents.notifyAll();
                            }
                        }
                    });
                    this.takeList.clear();
                }
                this.putList.clear();
            }
        }
    }

    public void configure(Context context) {
        if (this.channelCounter == null) {
            this.channelCounter = new ChannelCounter(getName());
        }
        this.takeTimeoutMillis = context.getLong("takeTimeout", Long.valueOf(DEFAULT_TAKE_TIMEOUT_MILLIS)).longValue();
    }

    public synchronized void start() {
        this.channelCounter.start();
        super.start();
    }

    public synchronized void stop() {
        super.stop();
        this.channelCounter.stop();
    }

    protected BasicTransactionSemantics createTransaction() {
        return new SynchronousTransaction();
    }

    public long getTransactionCapacity() {
        return 2147483647L;
    }

    @VisibleForTesting
    ChannelCounter getChannelCounter() {
        return this.channelCounter;
    }
}
