Primer ejemplo con apache spark
Hace algunas semanas atrás que empezamos a trabajar con apache spark en el máster, brevemente les contare mis impresiones desde mi punto de vista como principiante.
Apache spark me gusto, ¿por qué? porque se puede programar en Scala (además de Python y Java), la API de Scala lo simplifica mucho y la cantidad de código a teclear es (considerablemente) menor a la necesaria para hacer la misma tarea en Java, aunque aquí debo hacer un paréntesis, ya que con la entrada de Java 8 y las lambda expresión la cantidad de código será menor pero insisto la API de Scala a mi modo de ver lo hace mas sencillo.
Nos permite hacer operaciones con mucha data (quizás no Big Data, es decir no hablamos de TeraBytes) sin necesidad de usar Hadoop, me gustaría hacer hincapié en esto ya que muchos piensan que Big Data es Hadoop y NO, no es así, mi concepto de Big Data es un poco mas amplio y contempla mas cosas, aquí me refiero específicamente a la API MapReduce de Hadoop la cual es mas complicada de usar que la de Spark, no obstante si necesitamos trabajar con una cantidad de datos considerable que requiera del uso de HDFS, podemos desde Spark acceder a este sistema de archivo u algún otro como por ejemplo Amazon S3.
Por último decirles que Spark es mas rapido, esto se debe a su arquitectura, ya que este trabaja en memoria (todo en memoria si tiene la capacidad de cargar todos los datos por completo) y pues si llegamos a hacer varias tareas (lo que técnicamente sería reutilizar los RDD ya guardados en memoria) con los datos en memoria es cuando mas jugo se le sacará a Spark.
Sinceramente si alguien discrepa de mi o coincide me entusiasmaría mucho que comparta su opinión ya que todo este mundo que envuelve el Big Data me tiene super enganchado.
Ahora bien vamos con los ejemplos, estas formaban parte de la primera tarea de Spark que tuvimos que realizar, y consiste en 2 ejemplos:
- Dado un fichero de texto en formato csv con información de medallistas olímpicos, obtener el numero de medallas por edad.
- Dado el mismo fichero de texto anterior, y un esquema de puntuación por tipo de medalla obtener un ranking de medallistas olímpicos.
He aqui el código:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package org.dummy import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object ScalaApp extends App { val logFile = "OlympicAthletes.csv" val sc = new SparkContext("local", "Simple", "$SPARK_HOME" , List("target/spark-ejercicio-1.0.jar")) //Todo el fichero csv es cargado en este RDD val file = sc.textFile(logFile) //linea a linea se aplica un split o separacion utilizando la "," //eso nos arrojara un arreglo de palabras (o numeros) //y construimos una instancia del tipo OlympicMedalRecords val olympicMedalRecordsRDD = file.map(x => { val arr = x.split(",") new OlympicMedalRecords(arr(0), Integer.parseInt(arr(1)), arr(2) , Integer.parseInt(arr(3)), arr(5), Integer.parseInt(arr(6)), Integer.parseInt(arr(7)), Integer.parseInt(arr(8))) } ) //Mis ejercicios //Ejercicio 1 //mapeo del tipo (edad, num de medallas), val medal es un RDD val medal = olympicMedalRecordsRDD.map(record => (record.getAge, record.getGoldMedals + record.getSilverMedals + record.getBronzeMedals)) //creamos un nuevo RDD el cual manejara la suma de todas las madellas por edad val sum = medal.reduceByKey((acc: Int,value: Int)=>acc+value) println("Lista de Medallas ordenado por Edad") //ordenamos, juntamos y luego recorremos para mostrar todos los valores sum.sortBy(_._1).collect.foreach(println) //Ejercicio 2 hacer un ranking por atleta // ya que oro = 3 ptos, plata = 2ptos, bronce = 1pto //sportGuy es otro RDD y aqui hacemos un mapeo del tipo (nombre_atleta, puntos_por_medalla) val sportGuy = olympicMedalRecordsRDD.map(record => (record.getName, 3*record.getGoldMedals + 2*record.getSilverMedals + record.getBronzeMedals)) //ranking es a su vez otro RDD y de igual manera como se hizo en el ejemplo anterior se agrupa por nombre de atleta // y también utilizamos una variable acumuladora acc donde vamos sumando los ptos de un determinado atleta val ranking = sportGuy.reduceByKey((acc,value)=>acc+value) println("Ranking de deportistas") // ordenamos en sentido inverso y mostramos los atletas con mas ptos ranking.sortBy(_._2, false).collect.foreach(println) } |
Una última cosa que es bueno saber para aquellos que como yo están empezando con Spark y sus RDD, Los RDD tienen 2 tipos de operaciones: transformaciones y acciones, las primeras son operaciones que como resultado nos devuelven otro RDD y las acciones son aquellas que hacen «algo» sobre los datos como por ejemplo el foreach de mi código, utilizado para mostrar por pantalla los valores finales, además los RDD (como hasta ahora lo entiendo) son las instrucciones a realizar sobre los datos, mas no almacenan la información, además estás instrucciones no se realizan de inmediato, sino que al contrario estos tienen un funcionamiento lazy, esto significa que se ejecutan cada vez que se realiza una operación de tipo acción.
Otra sugerencia, no duden en cacharrear con la consola de spark, de hecho el primer ejercicio lo llegue a hacer por la consola (sin los JavaBeans) y el practicar por aquí si que ayuda a conocer los operaciones a utilizar sobre los RDD.
Aquellos interesados en descargar el código, les dejo la dirección del ejercicio en mi repositorio github
El esqueleto del ejercicio esta en el repositorio del que fue nuestro profesor y pueden acceder a el haciendo clic aquí.
Cualquier material, idea o contenido que quieres compartir, no dudes en hacerlo, ya que bien recibido será. Ahora si para cerrar les dejo el enlace a un libro que hasta ahora me esta pareciendo muy bueno para aprender Spark (además con ejemplos elaborados en Java, Scala y Python).
Buenas tardes Jose,
estoy realizando mi proyecto final de carrera sobre Big Data Platforms y ahora me toca investigar un poco sobre Apache Spark.
Se mas o menos como es el funcionamiento, pero tengo una duda que quizas tu me puedas resolver.
Por lo que tengo entendido, los RDD son objetos que se almacenan en memoria cache y sobre los que solo podemos realizar dos tipos de operaciones. Mi duda es, ¿cuantos RDDs se crean?. Por ejemplo, yo quiero cargar un fichero de datos .csv a Apache Spark para trabajar con ellos. ¿Cuantos RDDs se crean? ¿Que es lo que se guarda directamente en los RDD? ¿Almacenan los datos del fichero?
Muchas gracias por ayudarme.
Atentamente,
David.
Hola David, perdona la tardanza, no soy un experto en el tema pero intentaré ayudarte.
Los RDD son colecciones de objetos donde a su vez especificamos «la instrucción» de que ha de aplicarse sobre los datos (filtrados, transformaciones, etc…).
Así como tu bien dices los RDD permiten realizar 2 cosas: Acciones y Transformaciones (dentro de esta categoría está incluido el filtrado), sabras distinguir entre ambas ya que las transformaciones generan/devuelven otro RDD.
Para la carga de un csv por ejemplo en scala sería algo así
val textFile = sc.textFile(«file.csv») //carga de un csv esta operación devuelve un RDD
textFile.first() //Devuelve la primera linea del csv. Esto es una acción
Las lineas anteriores generarían un único RDD, el detalle esta en que si nosotros no persistimos o guardamos en cache el RDD, si ejecutáramos otra acción a continuación como por ejemplo textFile.take(5) entonces se cargaría de nuevo el fichero csv.
¿Como deberíamos hacer entonces?
Se me ocurre hacer el código de la siguiente manera
val textFile = sc.textFile(«file.csv») //carga de un csv esta operación devuelve un RDD
textFile.cache() //guardamos en cache (memoria)
textFile.first() //Devuelve la primera linea del csv. Esto es una acción
textFile.take(5) //Devuelve 5 elementos y ya no cargaría de nuevo el fichero csv
NOTA: Los RDD operan de forma lazy (perezosa), es decir, las instrucción o computo se realiza al ejecutarse una acción, por ende de acuerdo al ejemplo anterior el RDD que contiene el fichero csv es guardado en cache al momento de ejecutar la primera acción (textFile.first()) y al momento de ejecutarse la segunda opción ya el csv esta en cache y no es necesario que se cargue de nuevo.
Por último hay algunas acciones donde spark implícitamente guarda en cache sin que explícitamente invoquemos la función cache() o persist(), por ejemplo al ejecutar la acción reduceByKey.
Amigo, cómo se ejecuta el ejemplo de los medallistas?
Amigo, cómo se ejecuta el ejemplo de los medallistas?
Hola Jordan, tienes varias alternativas para poder ejecutarlo y con fines de desarrollo, es decir, pruebas, te las comento a continuación:
También aprovecho la ocasión para mostrarte una versión más actualizada de este ejercicio pero haciendo uso de DataFrames y Spark SQL en vez de RDD’s e incluso resolviendo más interrogantes. He aquí el enlace al repositorio
Hola estoy comenzando a trabajar con spark pues estoy inmersa en el tema de big data para mi tesis. Estoy comenzando con un ejemplo bastante sencillo, contar palabras de un fichero .txt de 1gb, pero me esta dando el siguiente error:
java.lang.OutOfMemoryError: Java heap space
Como hago para resolverlo, porfavor seria de mucha ayuda su respuesta, gracias atentamente Luna