Commit ad4ef855 authored by ISAIAS PINTO GUITART's avatar ISAIAS PINTO GUITART
Browse files

Update images/resultado.png, images/resultado_txt.png, iHashTag_src/main.java,...

Update images/resultado.png, images/resultado_txt.png, iHashTag_src/main.java, iHashTag_src/mongoHandler.java, iHashTag_src/SentimentAnalyzer.java, iHashTag_src/SentimentMongo.java, iHashTag_src/SentimentMongo2.java, iHashTag_src/StreamKafkaConsumer.java, iHashTag_src/StreamTwitterProducer.java, iHashTag_src/TweetTreatment.java, iHashTag_src/TweetWithSentiment.java files
parent 350d8485
Pipeline #439 canceled with stages
package iHashTag;
import edu.stanford.nlp.ling.CoreAnnotations;
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations;
import edu.stanford.nlp.pipeline.Annotation;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations;
import edu.stanford.nlp.trees.Tree;
import edu.stanford.nlp.util.CoreMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class SentimentAnalyzer {
public TweetWithSentiment findSentiment(String line, StanfordCoreNLP stanfordCoreNLP) {
int mainSentiment = 0;
if (line != null && !line.isEmpty()) {
int longest = 0;
Annotation annotation = stanfordCoreNLP.process(line);
for (CoreMap sentence : annotation.get(CoreAnnotations.SentencesAnnotation.class)) {
Tree tree = sentence.get(SentimentCoreAnnotations.AnnotatedTree.class);
int sentiment = RNNCoreAnnotations.getPredictedClass(tree);
String partText = sentence.toString();
if (partText.length() > longest) {
mainSentiment = sentiment;
longest = partText.length();
}
}
}
//System.out.println(mainSentiment);
if (mainSentiment == 2 || mainSentiment > 4 || mainSentiment < 0) {
//return null;
}
TweetWithSentiment tweetWithSentiment = new TweetWithSentiment(line, toCss(mainSentiment));
return tweetWithSentiment;
}
private String toCss(int sentiment) {
//System.out.println(sentiment);
switch (sentiment) {
case 0:
//return "Very Negative";
return "-2";
case 1:
//return "Negative";
return "-1";
case 2:
//return "Neutral";
return "0";
case 3:
//return "Positive";
return "1";
case 4:
//return "Very Positive";
return "2";
default:
return "0";
}
}
}
\ No newline at end of file
package iHashTag;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import com.google.gson.*;
import com.mongodb.*;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import static com.mongodb.client.model.Filters.*;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
import org.bson.Document;
import org.bson.types.ObjectId;
public class SentimentMongo {
//@SuppressWarnings("deprecation")
public static void main(String[] args) {
Scanner teclado = new Scanner(System.in);
System.out.println("------- BIENVENIDO AL SENTIMENT TWEET (1) -------");
System.out.println("------- Por favor, ingresa la collection MongoDB correspondiente ------");
String topicName = teclado.nextLine();
System.out.println("------- Bind a la Collection: "+ topicName+" ------");
teclado.close();
MongoClient mongoClient = new MongoClient();
MongoDatabase database = mongoClient.getDatabase("cdistribuida");
MongoCollection<Document> collection = database.getCollection(topicName);
TweetTreatment tweet_treatment = new TweetTreatment();
SentimentAnalyzer sentimentAnalyzer = new SentimentAnalyzer();
List<TweetWithSentiment> sentiments = new ArrayList<>();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
Properties properties = new Properties();
properties.setProperty("annotators", "tokenize, ssplit, parse, sentiment");
StanfordCoreNLP stanfordCoreNLP = new StanfordCoreNLP(properties);
String document_jsonstring;
String id;
String sentiment_value;
String clean_tweet;
String dirty_tweet;
/**Block<Document> printBlock = new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document.toJson());
}
};**/
//SPARK
//ANTERIOR
while(true) {
FindIterable<Document> prueba = collection.find(eq("sentiment", "-10"));
int i = 0;
for (Document document : prueba) {
i++;
sentiment_value = "-15";
JsonObject jsonObject = new Gson().fromJson(document.toJson(),JsonObject.class);
JsonObject jsonObject_id = (JsonObject) jsonObject.get("_id");
JsonObject jsonObject_tweet = (JsonObject) jsonObject.get("tweet");
id = jsonObject_id.get("$oid").toString().replace("\"","");
if (jsonObject_tweet.get("retweetedStatus") == null) {
dirty_tweet = jsonObject_tweet.get("text").toString();
}else {
JsonObject jsonObjectaux;
jsonObjectaux = (JsonObject) jsonObject_tweet.get("retweetedStatus");
dirty_tweet = jsonObjectaux.get("text").toString();
}
clean_tweet = tweet_treatment.tweet_cleaner(dirty_tweet);
//System.out.println(clean_tweet);
TweetWithSentiment tweetWithSentiment = sentimentAnalyzer.findSentiment(clean_tweet, stanfordCoreNLP);
if (tweetWithSentiment != null) {
sentiments.add(tweetWithSentiment);
sentiment_value = tweetWithSentiment.getCssClass();
}
//Hacemos UPDATE
BasicDBObject newDocument = new BasicDBObject();
newDocument.append("$set", new BasicDBObject().append("sentiment", sentiment_value));
ObjectId objectid = new ObjectId(id);
BasicDBObject searchQuery = new BasicDBObject().append("_id", objectid);
collection.updateOne(searchQuery, newDocument);
}
}
}
}
package iHashTag;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import com.google.gson.*;
import com.mongodb.*;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Filters;
import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Projections.*;
import com.mongodb.client.model.Sorts;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
import java.util.Arrays;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.bson.Document;
import org.bson.types.ObjectId;
public class SentimentMongo2 {
//@SuppressWarnings("deprecation")
public static void main(String[] args) {
Scanner teclado = new Scanner(System.in);
System.out.println("------- BIENVENIDO AL SENTIMENT TWEET (2) -------");
System.out.println("------- Por favor, ingresa la collection MongoDB correspondiente ------");
String topicName = teclado.nextLine();
System.out.println("------- Bind a la Collection: "+ topicName+" ------");
teclado.close();
MongoClient mongoClient = new MongoClient();
MongoDatabase database = mongoClient.getDatabase("cdistribuida");
MongoCollection<Document> collection = database.getCollection(topicName);
TweetTreatment tweet_treatment = new TweetTreatment();
SentimentAnalyzer sentimentAnalyzer = new SentimentAnalyzer();
List<TweetWithSentiment> sentiments = new ArrayList<>();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
Properties properties = new Properties();
properties.setProperty("annotators", "tokenize, ssplit, parse, sentiment");
StanfordCoreNLP stanfordCoreNLP = new StanfordCoreNLP(properties);
String document_jsonstring;
String id;
String sentiment_value;
String clean_tweet;
String dirty_tweet;
/**Block<Document> printBlock = new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document.toJson());
}
};**/
while(true) {
FindIterable<Document> prueba = collection.find(eq("sentiment", "-11"));
int i = 0;
for (Document document : prueba) {
i++;
sentiment_value = "-15";
JsonObject jsonObject = new Gson().fromJson(document.toJson(),JsonObject.class);
JsonObject jsonObject_id = (JsonObject) jsonObject.get("_id");
JsonObject jsonObject_tweet = (JsonObject) jsonObject.get("tweet");
id = jsonObject_id.get("$oid").toString().replace("\"","");;
if (jsonObject_tweet.get("retweetedStatus") == null) {
dirty_tweet = jsonObject_tweet.get("text").toString();
}else {
JsonObject jsonObjectaux;
jsonObjectaux = (JsonObject) jsonObject_tweet.get("retweetedStatus");
dirty_tweet = jsonObjectaux.get("text").toString();
}
clean_tweet = tweet_treatment.tweet_cleaner(dirty_tweet);
//System.out.println(clean_tweet);
TweetWithSentiment tweetWithSentiment = sentimentAnalyzer.findSentiment(clean_tweet, stanfordCoreNLP);
if (tweetWithSentiment != null) {
sentiments.add(tweetWithSentiment);
sentiment_value = tweetWithSentiment.getCssClass();
}
//Hacemos UPDATE
BasicDBObject newDocument = new BasicDBObject();
newDocument.append("$set", new BasicDBObject().append("sentiment", sentiment_value));
ObjectId objectid = new ObjectId(id);
BasicDBObject searchQuery = new BasicDBObject().append("_id", objectid);
collection.updateOne(searchQuery, newDocument);
}
}
}
}
package iHashTag;
import java.util.Properties;
import java.util.Scanner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.json.*;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
public class StreamKafkaConsumer{
public static void main(String[] args) throws Exception {
Scanner teclado = new Scanner(System.in);
System.out.println("------- BIENVENIDO AL CONSUMIDOR TWITTER-KAKFA -------");
System.out.println("------- Por favor, ingresa el Hashtag a seguir ------");
String topicName = teclado.nextLine();
System.out.println("------- Ha decidido usted stremear a: "+ topicName+" ------");
teclado.close();
//Kafka consumer configuration settings
//String topicName = args[0].toString();
Properties props = new Properties();
int size = 0;
int offset = 0;
String variables [] = {"-10", "-11"};
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
//props.put("enable.auto.commit", "true");
//props.put("auto.commit.interval.ms", "1000");
//props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
mongoHandler mongodbHandler = new mongoHandler(topicName);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName.replace("#", "")));
//print the topic name
System.out.println("Suscrito al topic: " + topicName.replace("#", ""));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(0);
for (ConsumerRecord<String, String> record : records) {
if(size < 100) {
mongodbHandler.saveDocument(record.value(), variables[offset]);
System.out.println("--- Documento guardado en MongoDB");
size += 1;
}else {
size = 0;
offset +=1;
if(offset < 2) {
mongodbHandler.saveDocument(record.value(), variables[offset]);
System.out.println("--- Documento guardado en MongoDB");
size += 1;
}else {
offset = 0;
mongodbHandler.saveDocument(record.value(), variables[offset]);
System.out.println("--- Documento guardado en MongoDB");
size += 1;
}
}
}
}
}
}
package iHashTag;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import com.google.gson.*;
import java.util.Properties;
import java.util.Scanner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class StreamTwitterProducer {
/*
* Variables Stream Twitter
*/
public static final String _consumerKey = "1dEBLeomZJV9e15Z8Fg41MKT7";
public static final String _consumerSecret = "B7LIx6BjSoIWDiMnt0eOrPxCCPIcQ0QntbRV57bPsD5bxWS1hU";
public static final String _accessToken = "1183717702571048965-q8iJXPOQEuUsqRvutXMiRzgytpJgcI";
public static final String _accessTokenSecret = "pIIGucIM4b3ZYxYUC2n2Cg13p0WOmOD1fkbremkCVTLnD";
private static final boolean EXTENDED_TWITTER_MODE = true;
public static void main(String[] args) {
Scanner teclado = new Scanner(System.in);
/*
* Configuración Productor Kafka
*/
System.out.println("------- BIENVENIDO AL PRODUCTOR TWITTER-KAKFA -------");
System.out.println("------- Por favor, ingresa el Hashtag a seguir ------");
String topicName = teclado.nextLine();
System.out.println("------- Ha decidido usted stremear a: "+ topicName+" ------");
teclado.close();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
//props.put("batch.size", 16384);
//Reduce the no of requests less than 0
//props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
//props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
configurationBuilder.setOAuthConsumerKey(_consumerKey).setOAuthConsumerSecret(_consumerSecret)
.setOAuthAccessToken(_accessToken).setOAuthAccessTokenSecret(_accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(configurationBuilder.build()).getInstance();
twitterStream.addListener(new StatusListener() {
/* when a new tweet arrives */
public void onStatus(Status status) {
if (status.getRetweetedStatus() == null) {
Gson gson = new Gson();
String json_string = gson.toJson(status);
producer.send(new ProducerRecord<String,String>(topicName.replace("#", ""),
json_string));
//System.out.println("Mensaje enviado");
//System.out.println(status.getText());
//System.out.println("TWEET SIN RETTWETEAR");
}else {
Gson gson = new Gson();
String json_string = gson.toJson(status);
producer.send(new ProducerRecord<String,String>(topicName.replace("#", ""),
json_string ));
//System.out.println("Mensaje enviado");
//System.out.println(status.getText());
//System.out.println("TWEET RETTWETEADO");
}
System.out.println("--------------------------------");
}
public String statusJsonImplToString(String tweet) {
String final_string;
final_string ="{content_tweet:"+tweet+"}";
return final_string;
}
@Override
public void onException(Exception arg0) {
System.out.println("Exception on twitter");
}
@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
System.out.println("Exception on twitter");
}
@Override
public void onScrubGeo(long arg0, long arg1) {
System.out.println("onScrubGeo");
}
@Override
public void onStallWarning(StallWarning arg0) {
System.out.println("EonStallWarning");
}
@Override
public void onTrackLimitationNotice(int arg0) {
System.out.println("EonTrackLimitationNotice");
}
});
FilterQuery tweetFilterQuery = new FilterQuery(); // See
tweetFilterQuery.track(new String[] {topicName}).language("en"); // , "Teletubbies"}); // OR on keywords
// ejemplo de localización (desde USA)
// tweetFilterQuery.locations(new double[][]{new double[]{-126.562500,30.448674}, new double[]{-61.171875,44.087585 }});
// See https://dev.twitter.com/docs/streaming-apis/parameters#locations for
// proper location doc.
// Note that not all tweets have location metadata set.
// ejemplo de idioma (en inglés)
/* tweetFilterQuery.language(new String[]{"en"}); */
twitterStream.filter(tweetFilterQuery);
}
}
package iHashTag;
import com.vdurmont.emoji.EmojiParser;
public class TweetTreatment {
public String tweet_cleaner(String dirty_tweet) {
dirty_tweet = dirty_tweet.replace("\""," ");
dirty_tweet = dirty_tweet.replace("\n", " ").replace("\r", " ");
dirty_tweet = EmojiParser.removeAllEmojis(dirty_tweet);
System.out.println(dirty_tweet);
String clean_tweet = "";
String parse[] = dirty_tweet.split(" ");
int index = 0;
for (String element : parse) {
//System.out.println(element);
if(element.startsWith("#")){
parse[index] = "@CLEAN@";
}else if(element.contains("\\n")){
parse[index] = element.replace("\\n", " ");
if(parse[index].startsWith("http")) {
parse[index] = "@CLEAN@";
}
}else if(element.startsWith("http")) {
parse[index] = "@CLEAN@";
}
index++;
}
for (String clean_element : parse) {
if(!clean_element.contentEquals("@CLEAN@")) {
clean_tweet+=clean_element+" ";
}
}
clean_tweet.replaceAll("[^\\dA-Za-z' '.,;']", "");
System.out.println("TWETT LIMPIO: " +clean_tweet);
return clean_tweet;
}
}
package iHashTag;
public class TweetWithSentiment {
private String line;
private String cssClass;
public TweetWithSentiment() {
}
public TweetWithSentiment(String line, String cssClass) {
super();
this.line = line;
this.cssClass = cssClass;
}