Hace unos 20 días lei un articulo titulado “Simple Data Analysis Using Spark” (no publico el enlace ya que el mismo desapareció de dzone de la zona de Big Data y el mismo blog ya no existe, corroborarlo buscando por Internet), lo interesante de este articulo es que el autor hacía cálculos de estadística simple, es decir, calculaba, media, máximo y mínimo, y fue casualidad que justo en ese momento acaba de leer acerca de las operaciones numéricas de los RDD, es esta la razón que me movió a querer hacer un ejercicio similar, con estadística simple pero quería hacerlo con una información que yo considerara valiosa.
Fue entonces que me decidí a buscar alguna fuente de open data española, ya que quería arrojar u obtener datos reales, o no se quizás quería sencillamente obtener algo significativo que se acercara a una tarea real, fue así que llegue al Portal de datos abiertos de la comunidad de Madrid y después de revisar el catalogo me decidí por el padrón municipal, un fichero csv de 22 MB.
He aquí el código:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
package com.josedeveloper import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ case class Padron(codDistrito: String, descDistrito: String, codDistBarrio: String, descBarrio: String, codBarrio: String, codDistSeccion: String, spainMen: Integer, spainWomen: Integer, foreignerMen: Integer, foreignerWomen: Integer) { def numberOfPeople() : Int = { return spainMen + spainWomen + foreignerMen + foreignerWomen; } } object ScalaApp extends App { val sc = new SparkContext("local", "Simple Application", "$SPARK_HOME" , List("target/SimpleApp-0.1.jar")) val file = sc.textFile("Rango_Edades_Seccion_201506.csv") val data = file.map(line => line.split(";").map(_.trim)) //the csv header is excluded .mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter.drop(1) else iter).persist() //Map to Padron objects val dataMapped = data.map(line => (line(1), new Padron(line(0), line(1), line(2), line(3), line(4), line(5), getIntValue(line(8)), getIntValue(line(9)), getIntValue(line(10)), getIntValue(line(11))))) //Groupped by district val grouppedData = dataMapped.reduceByKey((x: Padron, y: Padron) => new Padron(x.codDistrito, x.descDistrito, x.codDistBarrio, x.descBarrio, x.codBarrio, x.codDistSeccion, x.spainMen + y.spainMen, x.spainWomen + y.spainWomen, x.foreignerMen + y.foreignerMen, x.foreignerWomen + y.foreignerWomen)) //sorted list total spanish men by district grouppedData.collect().sortBy(_._2.spainMen) .foreach(x => println(x._2.descDistrito + "->" + x._2.spainMen)) //statistics and numerics sparks RDD values val spainMaleValeByDistrict = grouppedData.map(tupla => tupla._2.spainMen.doubleValue()).cache() val media = spainMaleValeByDistrict.mean() val stddev = spainMaleValeByDistrict.stdev() val max = spainMaleValeByDistrict.max() val min = spainMaleValeByDistrict.min() println("Media de Españoles varones por distrito: " + media.toInt) println("Desviación estandar de españoles varones por distrito: " + stddev.toInt) println("Num maximo de españoles varones en un distrito: " + max.toInt) println("Num minimo de españoles varones en un distrito: " + min.toInt) //sorted list total number of people by district grouppedData.collect().sortBy(_._2.numberOfPeople) .foreach(x => println(x._2.descDistrito + "->" + x._2.numberOfPeople)) val numberOfPeopleByDistrict = grouppedData.map(tupla => tupla._2.numberOfPeople.doubleValue).cache() val media2 = numberOfPeopleByDistrict.mean() val stddev2 = numberOfPeopleByDistrict.stdev() val max2 = numberOfPeopleByDistrict.max() val min2 = numberOfPeopleByDistrict.min() println("Media de personas por distrito: " + media2.toInt) println("Desviación estandar de personas por distrito: " + stddev2.toInt) println("Num maximo de personas en un distrito: " + max2.toInt) println("Num minimo de personas en un distrito: " + min2.toInt) def getIntValue(s:String) : Integer = { val value = s.substring(1, s.length - 1) if (value.isEmpty) return 0 return Integer.parseInt(value) } } |
Hay una cosa que quiero resaltar, Lo simple y corto del código en Scala, tanto para la definición de la clase Padron como en las transformaciones hechas en los RDD, ya Scala esta en mi lista de cosas por aprender porque se que con Java hubiese sido el doble de código.
Por último les dejo parte de la información obtenida, la lista ordenada de población por distrito:
BARAJAS: 46166
VICALVARO: 69709
MORATALAZ: 94785
VILLA DE VALLECAS: 101011
MONCLOA-ARAVACA: 116581
RETIRO: 118390
CENTRO: 131805
USERA: 133579
CHAMBERI: 137454
VILLAVERDE: 141457
CHAMARTIN: 142334
SALAMANCA: 143123
ARGANZUELA: 151061
TETUAN: 152142
SAN BLAS-CANILLEJAS: 153303
HORTALEZA: 176320
CIUDAD LINEAL: 212626
PUENTE DE VALLECAS: 227183
LATINA: 234842
FUENCARRAL-EL PARDO: 234883
CARABANCHEL: 242032
¿Te ha parecido interesante? ¿Qué le agregarías o quitarías al código?
Si quieres acceder al GitHub donde he colgado el código pincha aquí