README.md 8.32 KB
Newer Older
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Big Data, batch con spark

### Prerequisitos
* eclipse (disponible en el laboratorio)

### Repositorio
En el repositorio dispones de los ficheros java así como texto de entrada (parte del quijote para probarlo).




## WordCount con spark
Spark, al igual que hadoop, funciona con varios nodos de cómputo y puede usar diferentes almacenamientos (como HDFS), aunque es posible usar spark localmente usando el almacenamiento de la máquina así como un único nodo. 

Para las rutas de los ficheros de entrada, usaremos diferentes URLs del tipo:
* `file:///home/user/mifichero_entrada.txt` para el caso local
* `hdfs://namenode:port/path` para ficheros almacenados en HDFS
* `path` para rutas relativas al directorio de instalación (sin esquema)

Para usar el nodo local, cuando configuremos el contexto spark, usaremos `local[numeroNodos]`


### Proyecto y dependencias

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
25
* crea un proyecto Java en eclipse, utiliza java8 (a veces viene como Java 1.8) en los jRE, convierte el proyecto en Maven Project.
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
26

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
27
* Añade la dependencia de spark. Si buscar en https://mvnrepository.com verás que siempre se indica una vesión del lenguaje Scala (https://es.wikipedia.org/wiki/Scala_(lenguaje_de_programaci%C3%B3n) ) que soporta. La versión de Spark a utilizar en ese caso, será la versión 3.0.1 de Spark, que soporta Scala 2.12. Nosotros usaremos Java (que es compatible con Scala) pero Spark está desarrollado con Scala.
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
28
29
30
31

```xml
	<dependencies>
		<dependency>
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
32
33
34
    		<groupId>org.apache.spark</groupId>
    		<artifactId>spark-core_2.12</artifactId>
    		<version>3.0.1</version>
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
35
36
37
38
		</dependency>
	</dependencies>
``` 

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
39
* Crea una clase llamada `JavaWordCount` en el paquete `cdist` con este código:
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

```java
package cdist;

import java.util.Arrays;
import java.util.Iterator;
import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;


/*  This example has been taken from Oreally examples */

public class JavaWordCount {

	
	public static void main(String[] args) throws Exception {
		
	
		// Set the input and output default files
		// it can be a file from hdfs -> hdfs://server:port/path
		// or a local file file:///path
		// or a relative local file "fileName" under the working directory (ie. out)
		String inputFile = "file:///var/home/lab/asig/labgcd/workspace-cdist-spark-and-streaming/spark-aptel/in.txt";
		String outputFile = "out";
		
		// let the user add params optionally to define input and output file
		if(args.length > 2)
		{
			inputFile = args[0];
			outputFile = args[1];
		}
		
			
		
		// Create a Java Spark Context, for the application with name "wordCount", use a local cluster 
		// (for using a existing cluster just substitute "local" with the name of the machine
		JavaSparkContext sc = new JavaSparkContext(
			      "local", "wordcount", System.getenv("SPARK_HOME"), System.getenv("JARS"));
		
		// Load our input data. 
		// will create an inmutable (RDD) set of strings (one per line)
		JavaRDD<String> input = sc.textFile(inputFile);
		
		
		// Split up into words.
		// make a map (line -> words in that line) and make it flat (so a sequence of words irrespectively of their line)
		JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
			public Iterator<String> call(String x) {
				return Arrays.asList(x.split(" ")).iterator();
			}
		});
		
		// Transform into word and count.
		// associate 1 per word
		// and then reduce by adding all the numbers per word (key)
		
		JavaPairRDD<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String x) {
				return new Tuple2<String, Integer>(x, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer x, Integer y) {
				return x + y;
			}
		});
		// Save the word count back out to a text file, causing evaluation.
		counts.saveAsTextFile(outputFile);
	}
}
```

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
117
**Analiza el código, y pruébalo** 
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
118

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
119
120
121
122
123
124
125
Para usar Spark en un solo nodo no es necesario hacer nada especial. Simplemente ejecutamos. Pero antes: 

1- asegúrate de crear un fichero de entrada de texto llamado `in.txt` dentro del proycto de Java (eso lo puedes hacer, pinchando en el proyecto, luego `new file` donde el diálogo te permitirá poner el nombre `in.txt`). 

2- Copia texto libre dentro de ese fichero (puedes copiar, por ejemplo, de la Wikipedia).

3- Como podrás ver en el código, hay una ruta para el fichero de entrada:
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
126
127
128
129
130
131
```java
String inputFile = "file:///var/home/lab/asig/labgcd/workspace-cdist-spark-and-streaming/spark-aptel/in.txt";
```

Esta ruta, tendrás que sustituirla por la ruta correcta de tu fichero `in.txt`. Para ello, pincha sobre el fichero `in.txt` en eclipse, y haz click con el botón derecho. En propiedades, dentro del atributo `location` tendrás la ubicación exacta que además podrás copiar seleccionándolo con el ratón (recuerda que debes mantener `file://` en el nombre).

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
4- Por útlimo, pulsa sobre la clase `JavaWordCount.java`, pulsa botón derecho, Run, luego `Java Application`.

5- Cuando termine, pulsa sobre el proyecto, botón derecho, `Refresh` y verás que hay un nuevo directorio llamado `out` donde tienes la salida con el mismo formato de Hadoop. Si haces click en `part-0000` verás la salida (ye he copiado el texto de la página de Scala de la Wikipedia):

```
(editó,1)
(scala,5)
(realiza,1)
(especiales,1)
(construcciones.,1)
(x,,3)
(Ant.,1)
(que,8)
(equipado,1)
(vez,5)
(precedidos,2)
...

```

6- Tendrás que borrar el directorio `out` antes de volverlo a ejecutar.

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
154
### Notación lambda
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
155

Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
156
Java 8 soporta notación lambda (`->`) que facilita la programación y la lectura. Esta misma clase puede programarse con notación lambda. Pruebalo.
Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238

* Crea una clase JavaWordCountDelta con el siguiente código:

```java
package cdist;

import java.util.Arrays;
import java.util.Iterator;
import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;


/*  This example has been taken from Oreally examples */

public class JavaWordCountDelta {

	
	public static void main(String[] args) throws Exception {
		
	
		// Set the input and output default files
		// it can be a file from hdfs -> hdfs://server:port/path
		// or a local file file:///path
		// or a relative local file "fileName" under the working directory (ie. out)
		String inputFile = "file:///var/home/lab/asig/labgcd/workspace-cdist-spark-and-streaming/spark-aptel/in.txt";
		String outputFile = "out";
		
		// let the user add params optionally to define input and output file
		if(args.length > 2)
		{
			inputFile = args[0];
			outputFile = args[1];
		}
		
			
		
		// Create a Java Spark Context, for the application with name "wordCount", use a local cluster 
		// (for using a existing cluster just substitute "local" with the name of the machine
		JavaSparkContext sc = new JavaSparkContext(
			      "local", "wordcount", System.getenv("SPARK_HOME"), System.getenv("JARS"));
		
		// Load our input data. 
		// will create an inmutable (RDD) set of strings (one per line)
		JavaRDD<String> input = sc.textFile(inputFile);
		
		
		// Split up into words.
		// make a map (line -> words in that line) and make it flat (so a sequence of words irrespectively of their line)
		JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
			public Iterator<String> call(String x) {
				return Arrays.asList(x.split(" ")).iterator();
			}
		});
		
		// Transform into word and count.
		// associate 1 per word
		// and then reduce by adding all the numbers per word (key)
		
		JavaPairRDD<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String x) {
				return new Tuple2<String, Integer>(x, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer x, Integer y) {
				return x + y;
			}
		});
		// Save the word count back out to a text file, causing evaluation.
		counts.saveAsTextFile(outputFile);
	}
}

```



Dr. Daniel Diaz Sánchez's avatar
Dr. Daniel Diaz Sánchez committed
239