Commit 3af9a82f authored by DANIEL DIAZ SANCHEZ's avatar DANIEL DIAZ SANCHEZ
Browse files

Add new file

parents
# Big Data con Hadoop II
### Prerequisitos
**Variables de entorno**
Asegúrate de que tienes montado el sistema de la [práctica anterior](https://gitlab.pervasive.it.uc3m.es/distributed-computing-assignements/1-bigdata-hadoop-single-node).
## Análisis del código de WordCount
Para **MAP REDUCE** necesitamos un **mapper** y un **reducer**. El mapper va a asociar entradas tipo key/value a un tipo intermedio de parejas key/value.
El método a implementar sería de la clase
```java
org.apache.hadoop.mapreduce
Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
```
Con la signatura:
```java
protected void map(KEYIN key, VALUEIN value, Mapper.Context context)
```
`KEYIN`, `VALUEIN`, `KEYOUT`, `VALUEOUT` son tipos. Este tipo de clases java actuan de **patrón de código** ([java generic types](https://docs.oracle.com/javase/tutorial/java/generics/types.html)), es decir, es válida sea cual sea el tipo elegido de esos parámetros.
Pero vamos por partes
### Particionado del data set
En nuestro caso, recibiremos un `KEYIN` que nos proporciona hadoop, un identificador de datos de entrada, que se proporciona según el proceso de particionado de los datos de entrada. Los datos de entrada son, en nuestro caso, ficheros de texto guardados en HDFS. En clase discutimos otras tipos de entrada tiene Hadoop.
La ruta del fichero HDFS se lo pasamos al programa vía línea de comandos:
```
bin/hadoop jar wordcount.jar cdist.WordCount /user/cdist/wordcount/texto.txt /user/cdist/wordcount/output
```
Siendo `/user/cdist/wordcount/texto.txt` el fichero de entrada y `/user/cdist/wordcount/output` la ruta donde depositar la salida.
La entrada al sistema Hadoop es de tipo
```
org.apache.hadoop.mapreduce.InputFormat<K,V>
```
Este patrón de clase (o template) tendrá diferentes comportamientos en función de los parámetros K y V, que son tipos.
Si la entrada es, como en este caso, un fichero guardado en HDFS, usaremos org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> o cualquier clase que herede de ella.
Hadoop proporciona las siguientes clases para su uso, aunque es posible crear una nueva:
* [CombineFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html)
* [FixedLengthInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.html)
* [KeyValueTextInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.html)
* [NLineInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.html)
* [SequenceFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.html)
* [TextInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.html)
Como los datos de entrada es texto, usaremos [TextInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.html). La clase TextInputFormat, hereda de `FileInputFormat<LongWritable,Text>`, que a su vez hereda de `InputFormat<LongWritable,Text>`. Es decir, define `K` como `LongWritable` y `V` como `Text`.
A hadoop se le instruye acerca de qué tipo de datos hay a la entrada en el método `main` usando:
```
TextInputFormat.addInputPath(job, new Path(args[0]));
```
Una vez hecho esto, hadoop tiene que tomar los datos de entrada y, en función del tipo, partirlos para su procesamiento. Echando un vistazo la documentación de la clase TextInputFormat vemos que hadoop tomará el fichero y lo partirá en líneas.
Observa la figura para comprenderlo mejor:
<img src="" width="400px">
### Mapper
Lo bueno de hadoop es que internamente distribuye el trabajo en función de parejas `Key`/`Value` obtenidas como salida de la función `map`. Es decir, sea cual sea el tipo de datos, hadoop sólo maneja pares clave valor. Por esa razón se puede utilizar casi para lo que sea y amortizar la inversión de crear un cluster de procesamiento. De hecho como podéis ver en la imagen anterior, hadoop usa el parámetro Key de salida de map para agrupar y ordenar (shuffle/sort) antes de enviar al reducer. Por lo que es importante que el tipo de datos y los valores usados para el par key/value de salida de map tengan sentido y ayuden a resolver el problema.
Para que todo funcione, es necesario realizar el particionado adecuado según en problema, cosa que hemos visto en la sección anterior pero es igual de importante que una vez tengamos los datos particionados les asignemos un par key/value de salida coherente.
Vamos a explorar el código que habéis probado. Una vez partidos los datos, Hadoop invocará a la función `map` por cada trozo obtenido tras particionar los datos para conseguir ese par `Key`/`Value` independiente de la información de entrada que se utilzará para agrupar.
Como en el particionado usamos `K` como `LongWritable` y `V` como `Text` (`InputFormat<K,V>->InputFormat<LongWritable,Text>`) y sabiendo que la clase que implementa el método `map` deberá extender de `public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>` donde `KEYIN` y `VALUEIN` son los datos de entrada que nos dará el particionador de los datos de entrada implementado (según lo hemos decidido) por el uso de la clase `TextInputFormat`: `KEYIN` será `LongWritable` y `VALUEIN` será `Text`.
Ahora nosotros tenemos que decidir, qué tipo de pares `Key`/`Value`, independientes de la información usaremos a partir de ahora. Como vamos a contar palabras y las palabras se repiten, y no sabemos a priori si la línea que nos llega a la función map (que vendrá alojada en la variable value de tipo Text de la función map cuando nos llame hadoop) tiene palabras que no se vuelven a repetir en el texto, lo lógico es usar la palabra como key de salida (`KEYOUT`) de forma que si vuelve a repetirse en otra línea proporcionada por el particionador, tenga la misma clave que otras ocurrencias y hadoop pueda ordenar por palabra. Echemos un vistazo al código del mapper:
```java
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("KEYIN:" + key.getClass().getCanonicalName() + " val : " + value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
```
En el mapper, lo que vamos a hacer es recibir trozos de texto y mapearlos a un formato que map-reduce entienda, pueda ordenar y nos venga bien para nuestros propósitos. En concreto lo que haremos será partir el texto en palabras.
Para ello utilizamos un `StringTokenizer` que parte una cadena en trozos, uno por cada espacio y nos devuelve un iterador. Al iterar sobre el iterador, iremos accediendo a las palabras. Por cada una de ellas, emitiremos un par `key`/`value` de salida. Cada palabra contará uno. Por lo que el `KEYOUT` será `texto`, y `VALUEOUT` será un número, en concreto `1`. Por lo que asociaremos a cada palabra (no nos preocupamos de que estén repetidas o no) el valor `1` (`IntWritable one = new IntWritable(1)`).
Uno podría pensar que en lugar de dar como salida un par `[palabra,1]`, por ejemplo `[un,1]`, tantas veces como se repita la palabra "un", sería mejor comprobar si en la línea que nos han dado como resultado del particionado la palabra "un" aparece de nuevo para dar una salida agrupada, por ejemplo `[un,2]`. Por ejemplo, el texto que se propone a continuación, repite la palabra "un" dos veces en una linea:
```
Buen día. He querido mandarte un saludo. mira por la ventana: hace un
día estupendo, por fin ha salido el sol, mira!
```
Tal y como está programado, el particionador llamaría dos veces a `map`. En la primera, `map` recibiría como `KEYIN` un `long` probablemente con un identificador único que no se volverá a repetir (y que no nos interesa en este momento) y como `VALUEIN` tendríamos "Buenos días. He querido mandarte un saludo. Mira por la ventana: hace un"
Tras procesar esa entrada con el Mapper, tendríamos la siguiente salida:
```
[Buen,1]
[día,1]
[He,1]
[querido,1]
[mandarte,1]
[un,1]
[saludo,1]
[mira,1]
[por,1]
[la,1]
[ventana,1]
[hace,1]
[un,1]
```
Podríamos alterar el Mapper para que las agrupara directamente de esta forma:
```
[Buen,1]
[día,1]
[He,1]
[querido,1]
[mandarte,1]
[saludo,1]
[mira,1]
[por,1]
[la,1]
[ventana,1]
[hace,1]
[un,2]
```
Pero en realidad no es necesario, de hecho es ineficiente. Veamos por qué.
### Combiner
El procesamiento en hadoop es:
```java
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
```
Como podéis imaginar, el mapper se ejecutará en aquél nodo (JobTracker) que disponga de la información de entrada almacenada (rack aware/data locality, lo hemos visto en clase) para evitar mover los datos y permitir replicación en varios nodos. Pero el reducer, puede ejecutarse en otro nodo (máquina) diferente dado que los datos de salida del mapper pueden estar en memoria o replicarse en máquinas diferentes que las se ejecutó el mapper por disponibilidad de trozos del conjunto de entrada.
Por otro lado, el proceso de combine lo puede realizar hadoop aunque, dado que para algunos problemas puede ser interesante controlarlo, se puede implementar al igual que se hace con el Mapper y el Reducer. En código proporcionado usa la misma clase para el combine y el reduce:
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
Entonces ¿qué sentido tiene usar un combiner?
Como se ha comentado, el reducer puede ejecutarse en nodos distintos al nodo el que se ejecutó el map, pero el combiner se ejecuta a la salida de map en el mismo nodo, por lo que no es una reducción ni combinación "global" como hace el reducer distribuido, sino que es local, a la salida de la máquina en concreto que realizó el map. De esta forma damos una primera cobinación de los datos en la misma máquina en la que se hace el map para que no todo se haga en el reduce o para manipular la información con el propósito de mejorar la solución del problema.
El código del combiner es el mismo que el del reducer aunque podrían ejecutarse en diferentes máquinas:
```
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
```
Por tanto, ese agrupamiento que proponíamos en la sección anterior no tiene sentido que se realice en el Mapper, ya lo haría el combiner antes del invocar al Reducer.
### Reducer
Al igual que el combiner (en este caso) el Reducer agrupa y suma los resultados para proporcionar la solución final. Como podemos observar, hadoop hace un agrupamiento LOCAL por `KEYOUT` antes de invocar al `combiner`, y otro GLOBAL (con el resultado de la ejecución del Mapper/Combiner en cada nodo) antes de invocar al `Reducer` (dado que puede ser en otra máquina).
En ambos casos (local con el combiner o global con el reducer) a la entrada del método tenemos una key de entrada que tiene el mismo tipo que la `KEYOUT` del Mapper y una colección (no un único valor) de values con el tipo de salida del Mapper: `VALUEOUT`. La razón de tener una colección es el efecto del agrupamiento realizado por el combiner y el mecanismo de ordenación de hadoop.
Para entenderlo mejor, usaremos el texto que proponíamos antes. Supongamos que tenemos un cluster de tres nodos (nodo1, nodo2 ynodo3). Para procesar la primear línea se ejecuta el `Mapper` en el nodo1. Como resultado obtenemos:
```
[Buen,1]
[día,1]
[He,1]
[querido,1]
[mandarte,1]
[un,1]
[saludo,1]
[mira,1]
[por,1]
[la,1]
[ventana,1]
[hace,1]
[un,1]
```
En el mismo nodo1, se ejecutaría el combiner, dando como resultado:
```
[Buen,1]
[día,1]
[He,1]
[querido,1]
[mandarte,1]
[saludo,1]
[mira,1]
[por,1]
[la,1]
[ventana,1]
[hace,1]
[un,2]
```
Este resultado se almacena en memoria o en HDFS (según su tamaño). Supongamos que se almacena en HDFS y que hadoop decide guardar la información en el node3.
La segunda línea se procesa en el nodo2. La ejecución del mapper y el posterior combiner dan como resultado:
```
[día,1]
[estupendo,1]
[por,1]
[fin,1]
[ha,1]
[salido,1]
[el,1]
[sol,1]
[mira,1]
```
Este resultado se almacena también en el node3.
Después de esto, hadoop ordena los resultados anteriores y los agrupa. Y ejecuta el reducer en el node3 con la siguiente entrada:
```
[Buen,1]
[día, 1 ,1]
[He,1]
[querido,1]
[mandarte,1]
[saludo,1]
[mira, 1, 1]
[por, 1, 1]
[la,1]
[ventana,1]
[hace,1]
[un,2]
[estupendo,1]
[fin,1]
[ha,1]
[salido,1]
[el,1]
[sol,1]
```
La salida del reducer tras la ejecución en el node3 es:
```
[Buen,1]
[día, 2]
[He,1]
[querido,1]
[mandarte,1]
[saludo,1]
[mira, 2]
[por, 2]
[la,1]
[ventana,1]
[hace,1]
[un,2]
[estupendo,1]
[fin,1]
[ha,1]
[salido,1]
[el,1]
[sol,1]
```
Esta última sería la salida del sistema.
## Prueba con datos más grandes
Descarga varios libros o texto plano en general. Por ejemplo puedes obtener ficheros de texto grandes (no excesivamente) de aquí:
* [Proyecto Gutemberg] (https://www.gutenberg.org/wiki/Main_Page) (no olvides bajar las versiones en texto plano)
* http://norvig.com/big.txt (varios libros del proyecto guttemberg unidos)
### Un ejemplo básico
En este enlace http://www.gutenberg.org/ebooks/search/?sort_order=downloads puedes descargar **Alice's adventures in wonderland** en texto plano (utf8)
Guárdalos en HDFS. Para ello, se procede como antes:
```
bin/hdfs dfs -put AliceInWonderLand.txt /user/cdist/wordcount/AliceInWonderLand.txt
bin/hdfs dfs -ls /user/cdist/wordcount
Found 3 items
-rw-r--r-- 1 cdistuser supergroup 167546 2014-11-05 16:49 /user/cdist/wordcount/AliceInWonderLand.txt
drwxr-xr-x - cdistuser supergroup 0 2014-11-05 16:38 /user/cdist/wordcount/output
-rw-r--r-- 1 cdistuser supergroup 15 2014-11-05 16:36 /user/cdist/wordcount/texto.txt
```
Guárdalos todos de esa forma
### Cuenta las palabras de uno de ellos
Prueba a contar las palabras de Alice in Wonderland de la siguiente manera:
```
bin/hadoop jar wordcount.jar cdist.WordCount /user/cdist/wordcount/AliceInWonderLand.txt /user/cdist/wordcount/AIWLOutput
bin/hdfs dfs -cat /user/cdist/wordcount/AIWLOutput/*
bin/hdfs dfs -ls /user/cdist/wordcount/AIWLOutput/
Found 2 items
-rw-r--r-- 1 cdistuser supergroup 0 2014-11-05 16:51 /user/cdist/wordcount/AIWLOutput/_SUCCESS
-rw-r--r-- 1 cdistuser supergroup 58593 2014-11-05 16:51 /user/cdist/wordcount/AIWLOutput/part-r-00000
bin/hdfs dfs -get /user/cdist/wordcount/AIWLOutput/part-r-00000 alice_output.txt
```
Comprueba que funciona correctamente
### Cuenta las palabras de todos a la vez
Para ello, guarda todos los libros en un directorio de HDFS común, por ejemplo /user/cdist/wordcount/books
Para ejecutarlo sobre todos prueba (es posible que en shells como tcsh no funcione bien del todo el segundo comando):
```
bin/hadoop jar wordcount.jar cdist.WordCount /user/cdist/wordcount/books /user/cdist/wordcount/totalOutput
bin/hdfs dfs -cat /user/cdist/wordcount/totalOutput/*
```
Comprueba que funciona correctamente
## Mejoras y análisis de la ejecución
Vamos a ver si has entendido lo que hace hadoop
### Mejora el código
Como habrás podido ver, los resultados cuentan las palabras con comas y signos de puntuación e incluso caracteres blancos y no distingue entre, por ejemplo, Hola y Hola!
Para solucionarlo, modifica el método `map` limpiando la palabra (eliminando puntuación y blancos) antes de añadirla como clave de salida
Te puede resultar útil saber que para eliminar signos de puntuación se pueden utiliza regular expressions:
```
word_st.replaceAll("\\p{P}", "");
```
A continuación coloca entre el código trazas de log (usando System.out.println) para ver qué se recibe y qué se hace
Recuerda que cada vez que cambies algo en el código deberás exportar de nuevo el jar.
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment