package co.cask.cdap.test.messaging;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessageFetcher;
import co.cask.cdap.api.messaging.MessagePublisher;
import co.cask.cdap.api.worker.AbstractWorker;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/test/messaging/MessagingApp.class */
public class MessagingApp extends AbstractApplication {
    static final String TOPIC = "topic";
    static final String CONTROL_TOPIC = "controlTopic";

    /* loaded from: input_file:co/cask/cdap/test/messaging/MessagingApp$MessagingWorker.class */
    public static final class MessagingWorker extends AbstractWorker {
        public void run() {
            try {
                getContext().getAdmin().createTopic(MessagingApp.TOPIC);
                final MessageFetcher messageFetcher = getContext().getMessageFetcher();
                Message fetchMessage = MessagingApp.fetchMessage(messageFetcher, getContext().getNamespace(), MessagingApp.TOPIC, null, 3L, TimeUnit.SECONDS);
                final MessagePublisher messagePublisher = getContext().getMessagePublisher();
                String payloadAsString = fetchMessage.getPayloadAsString();
                messagePublisher.publish(getContext().getNamespace(), MessagingApp.TOPIC, new String[]{payloadAsString + payloadAsString});
                final AtomicReference atomicReference = new AtomicReference(fetchMessage.getId());
                getContext().execute(new TxRunnable() { // from class: co.cask.cdap.test.messaging.MessagingApp.MessagingWorker.1
                    public void run(DatasetContext datasetContext) throws Exception {
                        Message fetchMessage2 = MessagingApp.fetchMessage(messageFetcher, MessagingWorker.this.getContext().getNamespace(), MessagingApp.TOPIC, (String) atomicReference.get(), 3L, TimeUnit.SECONDS);
                        atomicReference.set(fetchMessage2.getId());
                        String payloadAsString2 = fetchMessage2.getPayloadAsString();
                        messagePublisher.publish(MessagingWorker.this.getContext().getNamespace(), MessagingApp.TOPIC, new String[]{payloadAsString2 + payloadAsString2});
                        MessagingApp.fetchMessage(messageFetcher, MessagingWorker.this.getContext().getNamespace(), MessagingApp.CONTROL_TOPIC, null, 10L, TimeUnit.SECONDS);
                    }
                });
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/test/messaging/MessagingApp$TransactionalMessagingWorker.class */
    public static final class TransactionalMessagingWorker extends AbstractWorker {
        private static final Logger LOG = LoggerFactory.getLogger(TransactionalMessagingWorker.class);

        public void run() {
            final boolean booleanValue = Boolean.valueOf((String) getContext().getRuntimeArguments().get("get.in.tx")).booleanValue();
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final CountDownLatch countDownLatch2 = new CountDownLatch(1);
            Thread thread = new Thread(getClass().getSimpleName() + "-publish") { // from class: co.cask.cdap.test.messaging.MessagingApp.TransactionalMessagingWorker.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        final AtomicReference atomicReference = new AtomicReference();
                        if (!booleanValue) {
                            TransactionalMessagingWorker.LOG.info("Get publisher outside of TX");
                            atomicReference.set(TransactionalMessagingWorker.this.getContext().getMessagePublisher());
                        }
                        TransactionalMessagingWorker.this.getContext().execute(new TxRunnable() { // from class: co.cask.cdap.test.messaging.MessagingApp.TransactionalMessagingWorker.1.1
                            public void run(DatasetContext datasetContext) throws Exception {
                                MessagePublisher messagePublisher = (MessagePublisher) atomicReference.get();
                                if (messagePublisher == null) {
                                    TransactionalMessagingWorker.LOG.info("Get publisher inside of TX");
                                    messagePublisher = TransactionalMessagingWorker.this.getContext().getMessagePublisher();
                                }
                                messagePublisher.publish(TransactionalMessagingWorker.this.getContext().getNamespace(), MessagingApp.TOPIC, new String[]{"payload"});
                                TransactionalMessagingWorker.LOG.info("Message published");
                                countDownLatch.countDown();
                                countDownLatch2.await();
                            }
                        });
                    } catch (TransactionFailureException e) {
                        throw Throwables.propagate(e);
                    }
                }
            };
            Thread thread2 = new Thread(getClass().getSimpleName() + "-fetch") { // from class: co.cask.cdap.test.messaging.MessagingApp.TransactionalMessagingWorker.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    final AtomicReference atomicReference = new AtomicReference();
                    if (!booleanValue) {
                        TransactionalMessagingWorker.LOG.info("Get fetcher outside of TX");
                        atomicReference.set(TransactionalMessagingWorker.this.getContext().getMessageFetcher());
                    }
                    try {
                        countDownLatch.await();
                        TransactionalMessagingWorker.this.getContext().execute(new TxRunnable() { // from class: co.cask.cdap.test.messaging.MessagingApp.TransactionalMessagingWorker.2.1
                            public void run(DatasetContext datasetContext) throws Exception {
                                MessageFetcher messageFetcher = (MessageFetcher) atomicReference.get();
                                if (messageFetcher == null) {
                                    TransactionalMessagingWorker.LOG.info("Get fetcher inside of TX");
                                    messageFetcher = TransactionalMessagingWorker.this.getContext().getMessageFetcher();
                                }
                                try {
                                    MessagingApp.fetchMessage(messageFetcher, TransactionalMessagingWorker.this.getContext().getNamespace(), MessagingApp.TOPIC, null, 3L, TimeUnit.SECONDS);
                                    throw new IllegalStateException("Expected timeout");
                                } catch (TimeoutException e) {
                                    TransactionalMessagingWorker.LOG.info("Expected timeout exception raised");
                                    countDownLatch2.countDown();
                                    try {
                                        MessagingApp.fetchMessage(messageFetcher, TransactionalMessagingWorker.this.getContext().getNamespace(), MessagingApp.TOPIC, null, 3L, TimeUnit.SECONDS);
                                        throw new IllegalStateException("Expected timeout");
                                    } catch (TimeoutException e2) {
                                        TransactionalMessagingWorker.LOG.info("Expected timeout exception raised");
                                    }
                                }
                            }
                        });
                        TransactionalMessagingWorker.this.getContext().execute(new TxRunnable() { // from class: co.cask.cdap.test.messaging.MessagingApp.TransactionalMessagingWorker.2.2
                            public void run(DatasetContext datasetContext) throws Exception {
                                MessageFetcher messageFetcher = (MessageFetcher) atomicReference.get();
                                if (messageFetcher == null) {
                                    TransactionalMessagingWorker.LOG.info("Get fetcher inside of TX");
                                    messageFetcher = TransactionalMessagingWorker.this.getContext().getMessageFetcher();
                                }
                                Message fetchMessage = MessagingApp.fetchMessage(messageFetcher, TransactionalMessagingWorker.this.getContext().getNamespace(), MessagingApp.TOPIC, null, 3L, TimeUnit.SECONDS);
                                TransactionalMessagingWorker.LOG.info("Message fetched {}", fetchMessage);
                                atomicBoolean.set("payload".equals(fetchMessage.getPayloadAsString()));
                            }
                        });
                    } catch (Exception e) {
                        throw Throwables.propagate(e);
                    }
                }
            };
            thread.start();
            thread2.start();
            Uninterruptibles.joinUninterruptibly(thread);
            Uninterruptibles.joinUninterruptibly(thread2);
            Preconditions.checkState(atomicBoolean.get(), "Expected succeeded to be true.");
        }
    }

    public void configure() {
        addWorker(new MessagingWorker());
        addWorker(new TransactionalMessagingWorker());
        addSpark(new MessagingSpark());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Message fetchMessage(MessageFetcher messageFetcher, String str, String str2, @Nullable String str3, long j, TimeUnit timeUnit) throws Exception {
        CloseableIterator fetch = messageFetcher.fetch(str, str2, 1, str3);
        Stopwatch start = new Stopwatch().start();
        while (!fetch.hasNext() && start.elapsedTime(timeUnit) < j) {
            try {
                TimeUnit.MILLISECONDS.sleep(100L);
                fetch = messageFetcher.fetch(str, str2, 1, str3);
            } catch (Throwable th) {
                fetch.close();
                throw th;
            }
        }
        if (!fetch.hasNext()) {
            throw new TimeoutException("Failed to get any messages from " + str2 + " in " + j + " " + timeUnit.name().toLowerCase());
        }
        Message message = (Message) fetch.next();
        fetch.close();
        return message;
    }
}
