package co.cask.cdap.template.etl.realtime.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.realtime.RealtimeContext;
import co.cask.cdap.template.etl.api.realtime.RealtimeSource;
import co.cask.cdap.template.etl.api.realtime.SourceState;
import com.google.common.collect.Queues;
import java.util.Date;
import java.util.Queue;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;

@Name("Twitter")
@Description("Twitter Realtime Source")
@Plugin(type = TwitterSource.SRC)
/* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/TwitterSource.class */
public class TwitterSource extends RealtimeSource<StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
    private static final String CONSUMER_KEY = "ConsumerKey";
    private static final String CONSUMER_SECRET = "ConsumerSecret";
    private static final String ACCESS_TOKEN = "AccessToken";
    private static final String ACCESS_SECRET = "AccessTokenSecret";
    private static final String ID = "id";
    private static final String MSG = "message";
    private static final String LANG = "lang";
    private static final String TIME = "time";
    private static final String FAVC = "favCount";
    private static final String RTC = "rtCont";
    private static final String SRC = "source";
    private static final String GLAT = "geolat";
    private static final String GLNG = "geoLong";
    private static final String ISRT = "isRetweet";
    private TwitterStream twitterStream;
    private StatusListener statusListener;
    private Queue<Status> tweetQ = Queues.newConcurrentLinkedQueue();
    private StructuredRecord.Builder recordBuilder;
    private final TwitterConfig twitterConfig;

    /* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/TwitterSource$TwitterConfig.class */
    public static class TwitterConfig extends PluginConfig {

        @Name(TwitterSource.CONSUMER_KEY)
        @Description("Consumer Key")
        private String consumerKey;

        @Name(TwitterSource.CONSUMER_SECRET)
        @Description("Consumer Secret")
        private String consumeSecret;

        @Name(TwitterSource.ACCESS_TOKEN)
        @Description("Access Token")
        private String accessToken;

        @Name(TwitterSource.ACCESS_SECRET)
        @Description("Access Token Secret")
        private String accessTokenSecret;

        public TwitterConfig(String str, String str2, String str3, String str4) {
            this.consumerKey = str;
            this.consumeSecret = str2;
            this.accessToken = str3;
            this.accessTokenSecret = str4;
        }
    }

    public TwitterSource(TwitterConfig twitterConfig) {
        this.twitterConfig = twitterConfig;
    }

    private StructuredRecord convertTweet(Status status) {
        this.recordBuilder.set(ID, Long.valueOf(status.getId()));
        this.recordBuilder.set("message", status.getText());
        this.recordBuilder.set(LANG, status.getLang());
        this.recordBuilder.set(TIME, Long.valueOf(convertDataToTimeStamp(status.getCreatedAt())));
        this.recordBuilder.set(FAVC, Integer.valueOf(status.getFavoriteCount()));
        this.recordBuilder.set(RTC, Integer.valueOf(status.getRetweetCount()));
        this.recordBuilder.set(SRC, status.getSource());
        if (status.getGeoLocation() != null) {
            this.recordBuilder.set(GLAT, Double.valueOf(status.getGeoLocation().getLatitude()));
            this.recordBuilder.set(GLNG, Double.valueOf(status.getGeoLocation().getLongitude()));
        } else {
            this.recordBuilder.set(GLAT, Double.valueOf(-1.0d));
            this.recordBuilder.set(GLNG, Double.valueOf(-1.0d));
        }
        this.recordBuilder.set(ISRT, Boolean.valueOf(status.isRetweet()));
        return this.recordBuilder.build();
    }

    private long convertDataToTimeStamp(Date date) {
        return System.nanoTime() - (date.getTime() * 1000000);
    }

    @Nullable
    public SourceState poll(Emitter<StructuredRecord> emitter, SourceState sourceState) {
        if (!this.tweetQ.isEmpty()) {
            emitter.emit(convertTweet(this.tweetQ.remove()));
        }
        return sourceState;
    }

    public void initialize(RealtimeContext realtimeContext) throws Exception {
        super.initialize(realtimeContext);
        System.setProperty("twitter4j.loggerFactory", "twitter4j.NullLoggerFactory");
        this.recordBuilder = StructuredRecord.builder(Schema.recordOf("tweet", new Schema.Field[]{Schema.Field.of(ID, Schema.of(Schema.Type.LONG)), Schema.Field.of("message", Schema.of(Schema.Type.STRING)), Schema.Field.of(LANG, Schema.of(Schema.Type.STRING)), Schema.Field.of(TIME, Schema.of(Schema.Type.LONG)), Schema.Field.of(FAVC, Schema.of(Schema.Type.INT)), Schema.Field.of(RTC, Schema.of(Schema.Type.INT)), Schema.Field.of(SRC, Schema.of(Schema.Type.STRING)), Schema.Field.of(GLAT, Schema.of(Schema.Type.DOUBLE)), Schema.Field.of(GLNG, Schema.of(Schema.Type.DOUBLE)), Schema.Field.of(ISRT, Schema.of(Schema.Type.BOOLEAN))}));
        this.statusListener = new StatusListener() { // from class: co.cask.cdap.template.etl.realtime.source.TwitterSource.1
            public void onStatus(Status status) {
                TwitterSource.this.tweetQ.add(status);
            }

            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
            }

            public void onTrackLimitationNotice(int i) {
            }

            public void onScrubGeo(long j, long j2) {
            }

            public void onStallWarning(StallWarning stallWarning) {
            }

            public void onException(Exception exc) {
            }
        };
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.setDebugEnabled(false).setOAuthConsumerKey(this.twitterConfig.consumerKey).setOAuthConsumerSecret(this.twitterConfig.consumeSecret).setOAuthAccessToken(this.twitterConfig.accessToken).setOAuthAccessTokenSecret(this.twitterConfig.accessTokenSecret);
        this.twitterStream = new TwitterStreamFactory(configurationBuilder.build()).getInstance();
        this.twitterStream.addListener(this.statusListener);
        this.twitterStream.sample();
    }

    public void destroy() {
        if (this.twitterStream != null) {
            this.twitterStream.shutdown();
        }
    }
}
