Commit ea61c18c authored by ISAIAS PINTO GUITART's avatar ISAIAS PINTO GUITART
Browse files

Update README.md

parent ad4ef855
Pipeline #440 canceled with stages
# **iHashTag**
---------------------
## 1. Objetivo:
El objetivo de este proyecto es la utilización de las tecnologías aplicadas en la asignatura Computación Distribuida del cuarto curso del grado en Tecnologías
de la Telecomunicación de la Universidad Carlos III de Madrid.
En concreto el objetivo de este proyecto es realizar un estudio en streaming de /#Hashtags, pudiéndo observar en todo momento en una interfaz gráfica la
evolución del sentimiento de dichos tweets. Esto servirá para poder tomar conclusiones conforme a estos resultodos, o incluso aprovechar la interfaz gráfica
para poder incluso proponer cierta actuaciones para variar el sentimiento de los tweets.
Hay que dejar claro, que un sentimiento negativo en el estudio no siempre repercute de manera negativa al /#Hashtag de estudio en concreto.
## 2. Resumen:
Indicaré de manera detallada de qué está formado el proyecto y su explicación correspondiente. Ordenaré conforme a la relación que se tenga entre
las clases correspondientes.
1. StreamTwitterProducer.java
2. StreamKafkaConsumer.java
3. mongoHandler.java
4. SentimentMongo.java
5. SentimentMongo2.java
6. TweetTreatment.java
7. SentimentAnalyzer.java
8. TweetWithSentiment.java
9. main.java
10. dash_init.py
11. dash_final.py
#### 1. StreamTwitterProducer.java
En esta clase lo que hacemos en general es:
1. Configuración para el uso de la API de Twitter.
2. Configuración para el uso de un productor Kafka.
3. Uso de Api Stream de Twitter y del Broker Kafka.
4. Ingesta de datos en topicName
En este caso; primeramente deberemos realizar la configuración para poder usar la api stream de Twitter. Luego es indispensable, poder lograr una
conexión con el broker kafka al cual enviaremos los datos. En este caso, creando si fuera necesario un topic (topicName) si este no existiera; el
topicName es el parámetro tanto de filtrado en tweets como para la creación del topic (en la creación del topic se omite el caracter #).
Luego, cada vez que se nos notifique la llegada de un nuevo tweet, procederemos a enviarlo al topic asociado del broker Kafka.
Como se puede observar, en el filtrado de tweets stream, ponemos la restricción que sean tweets en Inglés debido a que la clase que usamos para
el análisis de sentimiento, sólo tiene soporte para tweets en Inglés.
```java
package iHashTag;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import com.google.gson.*;
import java.util.Properties;
import java.util.Scanner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class StreamTwitterProducer {
/*
* Variables Stream Twitter
*/
public static final String _consumerKey = "1dEBLeomZJV9e15Z8Fg41MKT7";
public static final String _consumerSecret = "B7LIx6BjSoIWDiMnt0eOrPxCCPIcQ0QntbRV57bPsD5bxWS1hU";
public static final String _accessToken = "1183717702571048965-q8iJXPOQEuUsqRvutXMiRzgytpJgcI";
public static final String _accessTokenSecret = "pIIGucIM4b3ZYxYUC2n2Cg13p0WOmOD1fkbremkCVTLnD";
private static final boolean EXTENDED_TWITTER_MODE = true;
public static void main(String[] args) {
Scanner teclado = new Scanner(System.in);
/*
* Configuración Productor Kafka
*/
System.out.println("------- BIENVENIDO AL PRODUCTOR TWITTER-KAKFA -------");
System.out.println("------- Por favor, ingresa el Hashtag a seguir ------");
String topicName = teclado.nextLine();
System.out.println("------- Ha decidido usted stremear a: "+ topicName+" ------");
teclado.close();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
//props.put("batch.size", 16384);
//Reduce the no of requests less than 0
//props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
//props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
configurationBuilder.setOAuthConsumerKey(_consumerKey).setOAuthConsumerSecret(_consumerSecret)
.setOAuthAccessToken(_accessToken).setOAuthAccessTokenSecret(_accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(configurationBuilder.build()).getInstance();
twitterStream.addListener(new StatusListener() {
/* when a new tweet arrives */
public void onStatus(Status status) {
if (status.getRetweetedStatus() == null) {
Gson gson = new Gson();
String json_string = gson.toJson(status);
producer.send(new ProducerRecord<String,String>(topicName.replace("#", ""),
json_string));
//System.out.println("Mensaje enviado");
//System.out.println(status.getText());
//System.out.println("TWEET SIN RETTWETEAR");
}else {
Gson gson = new Gson();
String json_string = gson.toJson(status);
producer.send(new ProducerRecord<String,String>(topicName.replace("#", ""),
json_string ));
//System.out.println("Mensaje enviado");
//System.out.println(status.getText());
//System.out.println("TWEET RETTWETEADO");
}
System.out.println("--------------------------------");
}
public String statusJsonImplToString(String tweet) {
String final_string;
final_string ="{content_tweet:"+tweet+"}";
return final_string;
}
@Override
public void onException(Exception arg0) {
System.out.println("Exception on twitter");
}
@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
System.out.println("Exception on twitter");
}
@Override
public void onScrubGeo(long arg0, long arg1) {
System.out.println("onScrubGeo");
}
@Override
public void onStallWarning(StallWarning arg0) {
System.out.println("EonStallWarning");
}
@Override
public void onTrackLimitationNotice(int arg0) {
System.out.println("EonTrackLimitationNotice");
}
});
FilterQuery tweetFilterQuery = new FilterQuery(); // See
tweetFilterQuery.track(new String[] {topicName}).language("en"); // , "Teletubbies"}); // OR on keywords
// ejemplo de localización (desde USA)
// tweetFilterQuery.locations(new double[][]{new double[]{-126.562500,30.448674}, new double[]{-61.171875,44.087585 }});
// See https://dev.twitter.com/docs/streaming-apis/parameters#locations for
// proper location doc.
// Note that not all tweets have location metadata set.
// ejemplo de idioma (en inglés)
/* tweetFilterQuery.language(new String[]{"en"}); */
twitterStream.filter(tweetFilterQuery);
}
}
```
#### 2. StreamKafkaConsumer.java
En esta clase lo que hacemos en general es:
1. Configuración para el uso de un consumidor Kafka.
2. Configuración para poder usar una base de datos NoSQL como MongoDB.
3. Ingesta de Documentos en MongoDB.
```java
package iHashTag;
import java.util.Properties;
import java.util.Scanner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.json.*;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
public class StreamKafkaConsumer{
public static void main(String[] args) throws Exception {
Scanner teclado = new Scanner(System.in);
System.out.println("------- BIENVENIDO AL CONSUMIDOR TWITTER-KAKFA -------");
System.out.println("------- Por favor, ingresa el Hashtag a seguir ------");
String topicName = teclado.nextLine();
System.out.println("------- Ha decidido usted stremear a: "+ topicName+" ------");
teclado.close();
//Kafka consumer configuration settings
//String topicName = args[0].toString();
Properties props = new Properties();
int size = 0;
int offset = 0;
String variables [] = {"-10", "-11"};
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
//props.put("enable.auto.commit", "true");
//props.put("auto.commit.interval.ms", "1000");
//props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
mongoHandler mongodbHandler = new mongoHandler(topicName);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName.replace("#", "")));
//print the topic name
System.out.println("Suscrito al topic: " + topicName.replace("#", ""));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(0);
for (ConsumerRecord<String, String> record : records) {
if(size < 100) {
mongodbHandler.saveDocument(record.value(), variables[offset]);
System.out.println("--- Documento guardado en MongoDB");
size += 1;
}else {
size = 0;
offset +=1;
if(offset < 2) {
mongodbHandler.saveDocument(record.value(), variables[offset]);
System.out.println("--- Documento guardado en MongoDB");
size += 1;
}else {
offset = 0;
mongodbHandler.saveDocument(record.value(), variables[offset]);
System.out.println("--- Documento guardado en MongoDB");
size += 1;
}
}
}
}
}
}
```
#### 3. mongoHandler.java
En esta clase lo que hacemos en general es:
1. Configuración para el uso de un conector con MongoDB
2. Obtención del timestamp para guardar en MongoDB
3. Método para guardar Documento
```java
package iHashTag;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.HashMap;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import org.bson.Document;
import org.json.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
public class mongoHandler {
MongoClient mongo;
MongoDatabase database;
MongoCollection collection;
public mongoHandler(String topicname) {
this.mongo = new MongoClient("localhost",27017);
this.database = mongo.getDatabase("cdistribuida");
collection = this.database.getCollection(topicname);
}
public void saveDocument(String tweet, String sentiment_value) throws JsonParseException, JsonMappingException, IOException{
HashMap<String,Object> json =
new ObjectMapper().readValue(tweet, HashMap.class);
Document document = new Document( "timestamp" ,getKeyMongo( json.get("createdAt").toString()));
document.put("sentiment", sentiment_value);
document.put("tweet", json);
collection.insertOne(document);
//System.out.println(document.isEmpty());
}
public String getKeyMongo(String Date) {
String final_key;
String parts[];
String aux_parts[];
parts = Date.split(" ");
aux_parts = parts[1].split(",");
parts[1] = aux_parts[0];
if(parts[0].contentEquals("Jan")) {
parts[0] = "1";
}
//debe seguir con los demás meses(Pero no se como aparecen en Twitter)
aux_parts = parts[parts.length -2 ].split(":");
if(parts[4].contentEquals("PM")) {
int hora = Integer.parseInt(aux_parts[0]) + 12;
aux_parts[0] = Integer.toString(hora);
}
final_key = parts[1]+"_"+parts[0]+"_"+parts[2]+"_"+aux_parts[0]+"_"+aux_parts[1];
return final_key;
}
}
```
#### 4. SentimentMongo.java
1. Configuración para el uso de un conector con MongoDB
2. Obtención del los documentos con sentiment="-10"
3. Obtención del tweet según sea retweeteado o no
4. Con ese tweet "en sucio", lo procesamos para pasarlo a limpio.
5. Pasamos ese tweet en limpio al analizador de sentimiento, que nos dará un valor entero entre [-2,2]
El objetivo de que hayan dos clases iguales es que ante la saturación, es mejor dividir un poco el proceso en dos para agilizarlo un poco.
```java
package iHashTag;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import com.google.gson.*;
import com.mongodb.*;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import static com.mongodb.client.model.Filters.*;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
import org.bson.Document;
import org.bson.types.ObjectId;
public class SentimentMongo {
//@SuppressWarnings("deprecation")
public static void main(String[] args) {
Scanner teclado = new Scanner(System.in);
System.out.println("------- BIENVENIDO AL SENTIMENT TWEET (1) -------");
System.out.println("------- Por favor, ingresa la collection MongoDB correspondiente ------");
String topicName = teclado.nextLine();
System.out.println("------- Bind a la Collection: "+ topicName+" ------");
teclado.close();
MongoClient mongoClient = new MongoClient();
MongoDatabase database = mongoClient.getDatabase("cdistribuida");
MongoCollection<Document> collection = database.getCollection(topicName);
TweetTreatment tweet_treatment = new TweetTreatment();
SentimentAnalyzer sentimentAnalyzer = new SentimentAnalyzer();
List<TweetWithSentiment> sentiments = new ArrayList<>();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
Properties properties = new Properties();
properties.setProperty("annotators", "tokenize, ssplit, parse, sentiment");
StanfordCoreNLP stanfordCoreNLP = new StanfordCoreNLP(properties);
String document_jsonstring;
String id;
String sentiment_value;
String clean_tweet;
String dirty_tweet;
/**Block<Document> printBlock = new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document.toJson());
}
};**/
//SPARK
//ANTERIOR
while(true) {
FindIterable<Document> prueba = collection.find(eq("sentiment", "-10"));
int i = 0;
for (Document document : prueba) {
i++;
sentiment_value = "-15";
JsonObject jsonObject = new Gson().fromJson(document.toJson(),JsonObject.class);
JsonObject jsonObject_id = (JsonObject) jsonObject.get("_id");
JsonObject jsonObject_tweet = (JsonObject) jsonObject.get("tweet");
id = jsonObject_id.get("$oid").toString().replace("\"","");
if (jsonObject_tweet.get("retweetedStatus") == null) {
dirty_tweet = jsonObject_tweet.get("text").toString();
}else {
JsonObject jsonObjectaux;
jsonObjectaux = (JsonObject) jsonObject_tweet.get("retweetedStatus");
dirty_tweet = jsonObjectaux.get("text").toString();
}
clean_tweet = tweet_treatment.tweet_cleaner(dirty_tweet);
//System.out.println(clean_tweet);
TweetWithSentiment tweetWithSentiment = sentimentAnalyzer.findSentiment(clean_tweet, stanfordCoreNLP);
if (tweetWithSentiment != null) {
sentiments.add(tweetWithSentiment);
sentiment_value = tweetWithSentiment.getCssClass();
}
//Hacemos UPDATE
BasicDBObject newDocument = new BasicDBObject();
newDocument.append("$set", new BasicDBObject().append("sentiment", sentiment_value));
ObjectId objectid = new ObjectId(id);
BasicDBObject searchQuery = new BasicDBObject().append("_id", objectid);
collection.updateOne(searchQuery, newDocument);
}
}
}
}
```
#### 5. SentimentMongo2.java
1. Configuración para el uso de un conector con MongoDB
2. Obtención del los documentos con sentiment="-11"
3. Obtención del tweet según sea retweeteado o no
4. Con ese tweet "en sucio", lo procesamos para pasarlo a limpio.
5. Pasamos ese tweet en limpio al analizador de sentimiento, que nos dará un valor entero entre [-2,2]
```java
package iHashTag;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import com.google.gson.*;
import com.mongodb.*;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Filters;
import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Projections.*;
import com.mongodb.client.model.Sorts;
import edu.stanford.nlp.pipeline.StanfordCoreNLP;
import java.util.Arrays;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.bson.Document;
import org.bson.types.ObjectId;
public class SentimentMongo2 {
//@SuppressWarnings("deprecation")
public static void main(String[] args) {
Scanner teclado = new Scanner(System.in);
System.out.println("------- BIENVENIDO AL SENTIMENT TWEET (2) -------");
System.out.println("------- Por favor, ingresa la collection MongoDB correspondiente ------");
String topicName = teclado.nextLine();
System.out.println("------- Bind a la Collection: "+ topicName+" ------");
teclado.close();
MongoClient mongoClient = new MongoClient();
MongoDatabase database = mongoClient.getDatabase("cdistribuida");
MongoCollection<Document> collection = database.getCollection(topicName);
TweetTreatment tweet_treatment = new TweetTreatment();
SentimentAnalyzer sentimentAnalyzer = new SentimentAnalyzer();
List<TweetWithSentiment> sentiments = new ArrayList<>();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
Properties properties = new Properties();
properties.setProperty("annotators", "tokenize, ssplit, parse, sentiment");
StanfordCoreNLP stanfordCoreNLP = new StanfordCoreNLP(properties);
String document_jsonstring;
String id;
String sentiment_value;
String clean_tweet;
String dirty_tweet;
/**Block<Document> printBlock = new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document.toJson());
}
};**/
while(true) {
FindIterable<Document> prueba = collection.find(eq("sentiment", "-11"));
int i = 0;
for (Document document : prueba) {
i++;
sentiment_value = "-15";
JsonObject jsonObject = new Gson().fromJson(document.toJson(),JsonObject.class);
JsonObject jsonObject_id = (JsonObject) jsonObject.get("_id");
JsonObject jsonObject_tweet = (JsonObject) jsonObject.get("tweet");
id = jsonObject_id.get("$oid").toString().replace("\"","");;
if (jsonObject_tweet.get("retweetedStatus") == null) {
dirty_tweet = jsonObject_tweet.get("text").toString();
}else {
JsonObject jsonObjectaux;
jsonObjectaux = (JsonObject) jsonObject_tweet.get("retweetedStatus");
dirty_tweet = jsonObjectaux.get("text").toString();
}
clean_tweet = tweet_treatment.tweet_cleaner(dirty_tweet);
//System.out.println(clean_tweet);
TweetWithSentiment tweetWithSentiment = sentimentAnalyzer.findSentiment(clean_tweet, stanfordCoreNLP);
if (tweetWithSentiment != null) {
sentiments.add(tweetWithSentiment);
sentiment_value = tweetWithSentiment.getCssClass();
}