package org.apache.avro.ipc.trace;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.Protocol;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.RPCPlugin;
import org.apache.avro.ipc.generic.GenericRequestor;
import org.apache.avro.ipc.generic.GenericResponder;
import org.apache.avro.ipc.trace.SpanAggregator;
import org.apache.avro.ipc.trace.TracePlugin;
import org.apache.velocity.servlet.VelocityServlet;
import org.junit.Assert;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing.class
 */
/* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing.class */
public class TestEndToEndTracing {
    Protocol advancedProtocol = Protocol.parse("{\"protocol\": \"Advanced\", \"messages\": { \"w\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}],    \"response\": \"int\"},\"x\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}],    \"response\": \"int\"},\"y\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}],    \"response\": \"int\"} } }");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing$EndpointResponder.class
     */
    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing$EndpointResponder.class */
    public static class EndpointResponder extends GenericResponder {
        public EndpointResponder(Protocol protocol) {
            super(protocol);
        }

        @Override // org.apache.avro.ipc.Responder
        public Object respond(Protocol.Message message, Object obj) throws AvroRemoteException {
            return Integer.valueOf(((Integer) ((GenericRecord) obj).get(VelocityServlet.REQUEST)).intValue() + 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing$RecursingResponder.class
     */
    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing$RecursingResponder.class */
    public static class RecursingResponder extends GenericResponder {
        HttpTransceiver transC;
        HttpTransceiver transD;
        GenericRequestor reqC;
        GenericRequestor reqD;
        Protocol protocol;

        public RecursingResponder(Protocol protocol, RPCPlugin rPCPlugin) throws Exception {
            super(protocol);
            this.transC = new HttpTransceiver(new URL("http://localhost:21006"));
            this.transD = new HttpTransceiver(new URL("http://localhost:21007"));
            this.reqC = new GenericRequestor(protocol, this.transC);
            this.reqC.addRPCPlugin(rPCPlugin);
            this.reqD = new GenericRequestor(protocol, this.transD);
            this.reqD.addRPCPlugin(rPCPlugin);
            this.protocol = protocol;
        }

        @Override // org.apache.avro.ipc.Responder
        public Object respond(Protocol.Message message, Object obj) throws IOException {
            Assert.assertTrue("w".equals(message.getName()));
            Integer num = (Integer) ((GenericRecord) obj).get(VelocityServlet.REQUEST);
            Assert.assertTrue(num.equals(1));
            GenericData.Record record = new GenericData.Record(this.protocol.getMessages().get("x").getRequest());
            record.put(VelocityServlet.REQUEST, Integer.valueOf(num.intValue() + 1));
            Assert.assertTrue(((Integer) this.reqC.request("x", record)).equals(Integer.valueOf(num.intValue() + 2)));
            GenericData.Record record2 = new GenericData.Record(this.protocol.getMessages().get("x").getRequest());
            record2.put(VelocityServlet.REQUEST, Integer.valueOf(num.intValue() + 3));
            Assert.assertTrue(((Integer) this.reqD.request("x", record2)).equals(Integer.valueOf(num.intValue() + 4)));
            return Integer.valueOf(num.intValue() + 5);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing$SleepyResponder.class
     */
    /* loaded from: input_file:lib/cdap-etl-batch-4.1.1.jar:lib/avro-ipc-1.7.7-tests.jar:org/apache/avro/ipc/trace/TestEndToEndTracing$SleepyResponder.class */
    private static class SleepyResponder extends GenericResponder {
        public SleepyResponder(Protocol protocol) {
            super(protocol);
        }

        @Override // org.apache.avro.ipc.Responder
        public Object respond(Protocol.Message message, Object obj) throws AvroRemoteException {
            try {
                Thread.sleep(((Long) ((GenericRecord) obj).get("millis")).longValue());
                return null;
            } catch (InterruptedException e) {
                throw new AvroRemoteException((Throwable) e);
            }
        }
    }

    @Test
    public void testTraceAndCollectionMemory() throws Exception {
        TracePluginConfiguration tracePluginConfiguration = new TracePluginConfiguration();
        tracePluginConfiguration.storageType = TracePlugin.StorageType.MEMORY;
        testTraceAndCollection(tracePluginConfiguration);
    }

    @Test
    public void testTraceAndCollectionDisk() throws Exception {
        TracePluginConfiguration tracePluginConfiguration = new TracePluginConfiguration();
        tracePluginConfiguration.storageType = TracePlugin.StorageType.DISK;
        tracePluginConfiguration.buffer = false;
        testTraceAndCollection(tracePluginConfiguration);
    }

    public void testTraceAndCollection(TracePluginConfiguration tracePluginConfiguration) throws Exception {
        tracePluginConfiguration.storageType = TracePlugin.StorageType.MEMORY;
        tracePluginConfiguration.traceProb = 1.0d;
        tracePluginConfiguration.port = 51010;
        tracePluginConfiguration.clientPort = 12346;
        TracePlugin tracePlugin = new TracePlugin(tracePluginConfiguration);
        tracePluginConfiguration.port = 51011;
        tracePluginConfiguration.clientPort = 12347;
        TracePlugin tracePlugin2 = new TracePlugin(tracePluginConfiguration);
        tracePluginConfiguration.port = 51012;
        tracePluginConfiguration.clientPort = 12348;
        TracePlugin tracePlugin3 = new TracePlugin(tracePluginConfiguration);
        tracePluginConfiguration.port = 51013;
        tracePluginConfiguration.clientPort = 12349;
        TracePlugin tracePlugin4 = new TracePlugin(tracePluginConfiguration);
        RecursingResponder recursingResponder = new RecursingResponder(this.advancedProtocol, tracePlugin2);
        recursingResponder.addRPCPlugin(tracePlugin2);
        HttpServer httpServer = new HttpServer(recursingResponder, 21005);
        httpServer.start();
        EndpointResponder endpointResponder = new EndpointResponder(this.advancedProtocol);
        endpointResponder.addRPCPlugin(tracePlugin3);
        HttpServer httpServer2 = new HttpServer(endpointResponder, 21006);
        httpServer2.start();
        EndpointResponder endpointResponder2 = new EndpointResponder(this.advancedProtocol);
        endpointResponder2.addRPCPlugin(tracePlugin4);
        HttpServer httpServer3 = new HttpServer(endpointResponder2, 21007);
        httpServer3.start();
        GenericRequestor genericRequestor = new GenericRequestor(this.advancedProtocol, new HttpTransceiver(new URL("http://localhost:21005")));
        genericRequestor.addRPCPlugin(tracePlugin);
        GenericData.Record record = new GenericData.Record(this.advancedProtocol.getMessages().get("w").getRequest());
        record.put(VelocityServlet.REQUEST, (Object) 1);
        genericRequestor.request("w", record);
        Thread.sleep(1000L);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(tracePlugin.storage.getAllSpans());
        arrayList.addAll(tracePlugin2.storage.getAllSpans());
        arrayList.addAll(tracePlugin3.storage.getAllSpans());
        arrayList.addAll(tracePlugin4.storage.getAllSpans());
        SpanAggregator.TraceFormationResults traces = SpanAggregator.getTraces(SpanAggregator.getFullSpans(arrayList).completeSpans);
        Assert.assertEquals(1L, traces.traces.size());
        Assert.assertEquals(0L, traces.rejectedSpans.size());
        String printWithTiming = traces.traces.get(0).printWithTiming();
        Assert.assertTrue(printWithTiming.contains("w"));
        Assert.assertTrue(printWithTiming.contains("x"));
        Assert.assertTrue(printWithTiming.indexOf("x") != printWithTiming.lastIndexOf("x"));
        String printBrief = traces.traces.get(0).printBrief();
        Assert.assertTrue(printBrief.contains("w"));
        Assert.assertTrue(printBrief.contains("x"));
        Assert.assertTrue(printBrief.indexOf("x") != printBrief.lastIndexOf("x"));
        System.out.println(traces.traces.get(0).printWithTiming());
        System.out.println(traces.traces.get(0).printBrief());
        httpServer.close();
        httpServer2.close();
        httpServer3.close();
        tracePlugin.httpServer.close();
        tracePlugin.clientFacingServer.stop();
        tracePlugin2.httpServer.close();
        tracePlugin2.clientFacingServer.stop();
        tracePlugin3.httpServer.close();
        tracePlugin3.clientFacingServer.stop();
        tracePlugin4.httpServer.close();
        tracePlugin4.clientFacingServer.stop();
    }
}
