package co.cask.hydrator.plugin.spark;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.hydrator.common.ReferencePluginConfig;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.Date;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;
import twitter4j.Status;
import twitter4j.auth.OAuthAuthorization;
import twitter4j.conf.ConfigurationBuilder;

@Name("Twitter")
@Description("Twitter streaming source.")
@Plugin(type = "streamingsource")
/* loaded from: input_file:co/cask/hydrator/plugin/spark/TwitterStreamingSource.class */
public class TwitterStreamingSource extends ReferenceStreamingSource<StructuredRecord> {
    private static final String ID = "id";
    private static final String MSG = "message";
    private static final String LANG = "lang";
    private static final String TIME = "time";
    private static final String FAVC = "favCount";
    private static final String RTC = "rtCount";
    private static final String SRC = "source";
    private static final String GLAT = "geoLat";
    private static final String GLNG = "geoLong";
    private static final String ISRT = "isRetweet";
    private static final Schema SCHEMA = Schema.recordOf("tweet", Schema.Field.of(ID, Schema.of(Schema.Type.LONG)), Schema.Field.of(MSG, Schema.of(Schema.Type.STRING)), Schema.Field.of(LANG, Schema.nullableOf(Schema.of(Schema.Type.STRING))), Schema.Field.of(TIME, Schema.nullableOf(Schema.of(Schema.Type.LONG))), Schema.Field.of(FAVC, Schema.of(Schema.Type.INT)), Schema.Field.of(RTC, Schema.of(Schema.Type.INT)), Schema.Field.of(SRC, Schema.nullableOf(Schema.of(Schema.Type.STRING))), Schema.Field.of(GLAT, Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))), Schema.Field.of(GLNG, Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))), Schema.Field.of(ISRT, Schema.of(Schema.Type.BOOLEAN)));
    private final TwitterStreamingConfig config;

    /* loaded from: input_file:co/cask/hydrator/plugin/spark/TwitterStreamingSource$TwitterStreamingConfig.class */
    public static class TwitterStreamingConfig extends ReferencePluginConfig implements Serializable {
        private static final long serialVersionUID = 4218063781909515444L;

        @Name("ConsumerKey")
        @Description("Consumer Key")
        @Macro
        private String consumerKey;

        @Name("ConsumerSecret")
        @Description("Consumer Secret")
        @Macro
        private String consumerSecret;

        @Name("AccessToken")
        @Description("Access Token")
        @Macro
        private String accessToken;

        @Name("AccessTokenSecret")
        @Description("Access Token Secret")
        @Macro
        private String accessTokenSecret;

        @VisibleForTesting
        public TwitterStreamingConfig(String str, String str2, String str3, String str4, String str5) {
            super(str);
            this.consumerKey = str2;
            this.consumerSecret = str3;
            this.accessToken = str4;
            this.accessTokenSecret = str5;
        }
    }

    public TwitterStreamingSource(TwitterStreamingConfig twitterStreamingConfig) {
        super(twitterStreamingConfig);
        this.config = twitterStreamingConfig;
    }

    @Override // co.cask.hydrator.plugin.spark.ReferenceStreamingSource
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        pipelineConfigurer.getStageConfigurer().setOutputSchema(SCHEMA);
    }

    public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws Exception {
        streamingContext.registerLineage(this.config.referenceName);
        JavaStreamingContext sparkStreamingContext = streamingContext.getSparkStreamingContext();
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.setDebugEnabled(false).setOAuthConsumerKey(this.config.consumerKey).setOAuthConsumerSecret(this.config.consumerSecret).setOAuthAccessToken(this.config.accessToken).setOAuthAccessTokenSecret(this.config.accessTokenSecret);
        return TwitterUtils.createStream(sparkStreamingContext, new OAuthAuthorization(configurationBuilder.build())).map(new Function<Status, StructuredRecord>() { // from class: co.cask.hydrator.plugin.spark.TwitterStreamingSource.1
            public StructuredRecord call(Status status) {
                return TwitterStreamingSource.this.convertTweet(status);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StructuredRecord convertTweet(Status status) {
        StructuredRecord.Builder builder = StructuredRecord.builder(SCHEMA);
        builder.set(ID, Long.valueOf(status.getId()));
        builder.set(MSG, status.getText());
        builder.set(LANG, status.getLang());
        Date createdAt = status.getCreatedAt();
        if (createdAt != null) {
            builder.set(TIME, Long.valueOf(createdAt.getTime()));
        }
        builder.set(FAVC, Integer.valueOf(status.getFavoriteCount()));
        builder.set(RTC, Integer.valueOf(status.getRetweetCount()));
        builder.set(SRC, status.getSource());
        if (status.getGeoLocation() != null) {
            builder.set(GLAT, Double.valueOf(status.getGeoLocation().getLatitude()));
            builder.set(GLNG, Double.valueOf(status.getGeoLocation().getLongitude()));
        }
        builder.set(ISRT, Boolean.valueOf(status.isRetweet()));
        return builder.build();
    }
}
