package co.cask.hydrator.plugin.spark;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.hydrator.common.http.HTTPPollConfig;
import co.cask.hydrator.common.http.HTTPRequestor;
import java.util.concurrent.TimeUnit;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("HTTPPoller")
@Description("Fetch data by performing an HTTP request at a regular interval.")
@Plugin(type = "streamingsource")
/* loaded from: input_file:co/cask/hydrator/plugin/spark/HTTPPollerSource.class */
public class HTTPPollerSource extends StreamingSource<StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(HTTPPollerSource.class);
    private final HTTPPollConfig conf;

    public HTTPPollerSource(HTTPPollConfig hTTPPollConfig) {
        this.conf = hTTPPollConfig;
    }

    public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws Exception {
        return streamingContext.getSparkStreamingContext().receiverStream(new Receiver<StructuredRecord>(StorageLevel.MEMORY_ONLY()) { // from class: co.cask.hydrator.plugin.spark.HTTPPollerSource.1
            public StorageLevel storageLevel() {
                return StorageLevel.MEMORY_ONLY();
            }

            /* JADX WARN: Type inference failed for: r0v0, types: [co.cask.hydrator.plugin.spark.HTTPPollerSource$1$1] */
            public void onStart() {
                new Thread() { // from class: co.cask.hydrator.plugin.spark.HTTPPollerSource.1.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        HTTPRequestor hTTPRequestor = new HTTPRequestor(HTTPPollerSource.this.conf);
                        while (!isStopped()) {
                            try {
                                store(hTTPRequestor.get());
                            } catch (Exception e) {
                                HTTPPollerSource.LOG.error("Error getting content from {}.", HTTPPollerSource.this.conf.getUrl(), e);
                            }
                            try {
                                TimeUnit.SECONDS.sleep(HTTPPollerSource.this.conf.getInterval());
                            } catch (InterruptedException e2) {
                                throw new RuntimeException(e2);
                            }
                        }
                    }

                    @Override // java.lang.Thread
                    public void interrupt() {
                        super.interrupt();
                    }
                }.start();
            }

            public void onStop() {
            }
        });
    }
}
