Uso del SHOW PARTITIONS en Spark

Seguro que en alguna ocasión nos ha tocado hacer un SHOW PARTITIONS de una tabla HIVE particionada, con la finalidad (para quien no lo sepa) de obtener/visualizar las particiones existentes de los datos. Hasta aquí nada nuevo, pero lo que quiero mostrarles en esta oportunidad es un método (nada espectacular) que me ha servido mucho donde obtengo las particiones de una tabla (Hive) y los usos que le he dado al método, entre otras con grandes ventajas en performance.

Como seguramente muchos de ustedes saben si invocamos un SHOW PARTITIONS en spark por ejemplo en la spark-shell, esta nos devuelve un DataFrame con una única columna, como el siguiente ejemplo:

Con mi método lo que hago es transformar el DataFrame original en uno formateado donde cada variable de particionado es una columna. A continuación el método y el que sería el DataFrame resultante

Ahora ustedes se preguntarán y que hay de fascinante o interesante en este método. La verdad que el método en sí aporta poco es sencillamente una simple transformación pero para mí la magia reside en para que lo utilizo y es lo que les quiero contar.

Imaginen que la tabla que posee las 5 variables de partición (ni discutamos si es acertado poseer 5 variables de particionamiento) posee un sin fin de particiones y que a su vez para una misma ciudad en una misma fecha hay varias particiones por hora (como aparece en el ejemplo para la ciudad de Valencia) y cada partición a su vez tiene muchos registros. Con este supuesto si quisiéramos hacer una consulta para obtener la máxima partición (la más reciente) para una fecha, ciudad, estado y país en especifico podríamos llegar a tener problemas de TimeOut o SocketTimeOut ya que:
* El cluster se vería exigido intentando trabajar sobre las particiones pertinentes (debido al gran número de particiones).
* Una vez obtenidas las particiones cargar los datos desde HDFS y recorrer de forma innecesaria un conjunto de datos requiriendo mucho más memoria de la necesaria.

¿Recorrer de forma innecesaria?
Si, ya que recorreríamos un conjunto de registros donde muchos de esto compartirán el valor de la columna «hora» (partición) y apegándonoslos al ejemplo de arriba (la ciudad de Valencia) realmente los valores posibles serían 2:
* 1700
* 1750

Solución: Pues al obtener el DataFrame de particiones, si posteriormente filtramos por país, estado, ciudad y día solo nos quedarían 2 registros para el campo hora y sencillamente tendría que hallar el máximo valor de 2 registros en vez de tener que cargar datos de HDFS e iterar sobre todos estos.

¿Mucha más memoria de la necesaria?
Si, ya que al hacer un SHOW PARTITIONS, nosotros interactuamos con el metatstore y los metadatos en vez de trabajar con todos los datos de HDFS con todo lo que eso implica en cuanto a latencia, debido a la necesidad de ir a disco, etc.

¿Existe alguna otra ventaja de trabajar con el metastore?
Si, por ejemplo para hallar la máxima partición, trabajando únicamente con el DataFrame de particiones y una vez hallada la partición idónea, digamos que la más reciente, entonces construyo la consulta (muy especifica) indicando los valores de la partición deseada evitando esos errores de TimeOut haciendo uso eficiente de los recursos. De hecho yo lo que hecho es construir un WHERE dinámico (quizás lo comparto en la próxima entrada) a partir del DataFrame de particiones filtrado.

¿Se te ocurre otra ventaja de utilizar un método como este e interactuar con el metastore? ¿Habías hecho algo similar para tener mejoras de rendimiento y uso eficiente de recursos?

Espero que les sea de utilidad.

Spark Scala con Maven en IntelliJ

Este es un post que la verdad no había tenido en mente crear pero últimamente se me ha convertido en una necesidad y la verdad he disfrutado hacer y es que en estos ya casi 5 años involucrado en temas relacionados con Big Data y la nube la verdad es que he podido notar como construir un proyecto Spark desde cero se convierte en algo fácil pero netamente basado en copiar y pegar de proyectos anteriores, pero … y qué sucede cuando no hay un proyecto anterior jejeje, pero no es el único caso y qué sucede con aquellos que están aprendiendo, es cuestión de indagar por Internet y encuentras 30 formas distintas de armar un proyecto desde cero de Spark con Scala con Maven y en un IDE en este caso IntelliJ, pero cual es la idónea, cual es la que verdaderamente funciona.

Pues he decidido crear un esqueleto de proyecto (el cual espero poder ir evolucionando y mejorarlo) que seguramente no es la mejor pero desde mi humilde punto de vista es funcional.

Configurar el IDE

Lo primero antes que nada es instalar el jsdk (1.8 como mínimo), luego en la instalación o inmediatamente después es asegurarnos de contar con los plugins de Maven y Scala, para ello en la ventana de inicio vamos a los plugins.

Buscamos el plugin de Scala para verificar que este instalado si no lo está lo instalamos y luego en la misma ventana en la parte superior junto a Marketplace hacemos clic en installed y verificamos que el plugin de maven por defecto este habilitado.

Creamos el proyecto

Seleccionamos la opción de crear un nuevo proyecto.

Ventana de inicio de IntelliJ

Acto seguido seleccionamos la opción de proyecto maven y marcamos la opción de Create from archetype. Seleccionamos el archetype net.alchim31.maven:scala-archetype-simple y pulsamos el botón «Next». Si el archetype no existe pulsamos el botón de Añadir Archetype (Add Archetype) cumplimentamos la información con los siguientes datos:
GroupId: net.alchim31.maven
ArtifactId: scala-archetype-simple
Version: 1.7

Una vez añadido lo seleccionamos y como habíamos indicado antes pulsamos el botón «Next».

Indicamos el archetype en caso de no estar presente en la lista
Lista de archetypes para crear el proyecto

Inmediatamente después le daremos nombre a nuestro proyecto y si queremos ser más específicos indicamos el GroupId, ArtifactId y versión de nuestro proyecto (OJO esto último es opcional), pulsamos «Next» y por último en la ventana resumen pulsamos «Finish».

Configuración de nuestro artifact

Lo primero que deberemos hacer para que nos facilite la tarea será habilitar la autoimportación de las dependencias maven como señalamos en la imagen.

Habilitamos la autoimportación de dependencias

El construir el proyecto a partir de un archetype (arquetipo) maven consiste en armar el esqueleto de un proyecto a partir de una plantilla definiendo una estructura minima por defecto, por lo cual veremos un fichero pom.xml (gestión de dependencias maven) con algunas dependencias y una estructura de carpetas para el código fuente y pruebas unitarias, con ficheros incluidos.

Estructura del proyecto reciéntame creado

Aprovechamos de dar un vistazo a la clase App y a las pruebas unitarias que por defecto nos añade al proyecto e incluso podemos compilar el proyecto para contrastar que todo está de maravilla y para ello solamente necesitamos hacer clic en la pestaña maven ubicado en la parte derecha, donde aparece el nombre de nuestro proyecto desplegar lifecycle y hacer doble clic en compile y esto iniciará el proceso de compilación terminando exitosamente.

Añadimos dependencias

Ya estamos llegando al final, ahora lo que haremos será añadir al fichero pom.xml las dependencias spark que utilizaremos para este ejemplo. Empezaremos por editar las propiedades quedando estas así:

Añadimos las dependencias de spark al conjunto de dependencias existentes

Por último modificaremos nuestra clase App quedando esta así:

Para de nuevo volver a compilar el proyecto, que deberá culminar exitosamente.

Ejecución

La forma que indicaremos para la ejecución de los jobs desde IntelliJ no es la mejor pero es una forma sencilla y funcional para probar cosas y sobre todo para quien comienza a hacer tests sin necesidad de empaquetar y crear un jar y desplegarlo en una máquina virtual o en un cluster. ¿Cuál sería entonces la mejor forma? A mi modo de ver las cosas la mejor forma sería mediante prueba unitarias y de integración donde podamos probar todo el job de inicio a fin y para explicarles como ya tengo en mente preparar otro post paso a paso indicando como hacerlo y las herramientas para lograrlo. Continuando con la configuración de la ejecución, si sencillamente con botón derecho del ratón hacemos clic en Run ‘App’ nos arrojará el error.

Exception in thread «main» java.lang.NoClassDefFoundError: org/apache/spark/sql/SparkSession$
at com.josedeveloper.App$.main(App.scala:13)
at com.josedeveloper.App.main(App.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
… 2 more

Error ejecutando la clase App

El error se debe a que no encuentra las clases con las que fue compilado previamente y eso se debe a que las dependencias de spark las hemos añadido con el alcance «provided».
¿Por qué provided? Debido a que en un entorno empresarial esas dependencias no debemos agregarlas ya que las provee la infraestructura Big Data de la empresa.

Entonces para solventar el error sencillamente debemos ir al menu «Run» y hacemos clic en «Edit Configurations» y allí marcamos la opción de incluir dependencias provided (Include dependencies with «Provided» scope).

Marcamos la opción que incluya las dependencias con alcance «Provided»

Hecho eso volvemos a ejecutar la clase App y veremos como si se logra ejecutar la aplicación. Sin más espero que les haya servido de ayuda y les comento que mi próximo paso será crear un archetype (arquetipo) y a su vez explicarles como hacerlo para que cada quien pueda construir uno acorde con las necesidades de su organización y así dotamos de más profesionalidad y agilidad nuestro trabajo y evitamos el copiar+pegar donde en ocasiones terminamos añadiendo mas dependencias y plugins innecesarios así como también arrastrando problemas y errores (de haberlos).

Aquí les dejo el video

Repo GitHub

Estadística simple con Spark V2

Sigo con mi pruebas con lo nuevo (y no tan nuevo de Spark 2), hoy comparto con ustedes una versión 2 de mi anterior post Estadística simple con Spark, pero en esta ocasión realizado con Spark 2.

¿Que tiene de nuevo esta versión?

Primeramente utiliza el módulo spark-csv lo cual nos hace más simple la carga del fichero en un Dataset. Segundo, que no manipulamos en ningún instante RDD alguno, sino que por el contrario estamos trabajando con DataFrames representados mediante la clase Dataset. Entre las cosas nuevas que contempla esta versión hecha en Spark 2 es que mientras antes al realizar un groupBy sobre un DataFrame esto nos devolvía un GroupedData ahora nos devuelve un RelationalGroupedData, esto debido a un cambio de nombre que se le ha dado a partir de esta nueva versión de Spark.

Esta nueva versión realizada con SparkSQL con Datasets tiene varias ventajas, la primera es simplicidad, es mucho mas simple, mas fácil de entender el código además de mas corto, de hecho con menos lineas obtuve más información que con la versión elaborada con RDD’s, es decir, es mas versátil. Por otro lado aunque hay que tener algo de nociones de conjuntos lo interesante es que esta versión esta libre de código SQL.

Sin más dilación he aquí el código y el enlace al proyecto en Github.

Para que comparen los resultados obtenidos aquí con respecto a la entrada anterior dejo un pantallazo de lo obtenido al ejecutarlo en mi local.

promedios por distrito

promedios por distrito

Otras agregaciones por distrito

Otras agregaciones por distrito

Total personas por distrito

Total personas por distrito

Primeros pasos con Apache Spark 2

Hace pocos días salió la esperada versión 2 de Apache Spark y como algunos de ustedes saben es un framework que ahora mismo atrae mucho mi atención y como no pudo ser de otra forma hice un pequeño proyecto donde quiero ir colocando ejemplos sencillos de Spark con las nuevas (y no tan nuevas) cosas de Spark.

Para empezar comentarles que yo todavía no he utilizado sbt sino por el contrario uso maven como herramienta de construcción de proyectos. He aquí los primeros cambios necesarios para trabajar con spark 2, las dependencias correspondientes a la versión (indicadas en el pom.xml).

Entre los nuevos cambios de spark está que el punto de entrada para los programas spark ya no serán el hiveContext o sqlContext sino que han sido subsumidas en una clase llamada SparkSession. Las clases HiveContext y SQLContext se han mantenido para proporcionar retrocompatibilidad. Ejemplo

Con el SparkSession haremos lo mismo que hacíamos con sqlContext por ejemplo obtener un Dataset

O por el contrario obtener un DataFrame

Otro punto importante ha sido la unificación de las clases Dataset y DataFrame (para Java y Scala) a partir de la versión 2.¿Qué significa esto? pues sencillamente que ahora solo existirá la clase Dataset, pero proporcionará la misma funcionalidad que nos daba la clase DataFrame, de hecho basta con comparar la API en la versión 1.6.2 y 2.0.0 y ver como los métodos de la clase DataFrame están ahora incluidos en la clase Dataset.

Dataset y Dataframe en Spark 2

Dataset y Dataframe en Spark 2

Aquellos interesados en leer más acerca de Dataset y Dataframe  visitar este link

Estos no son los únicos cambios en Spark, de hechos son muchos más, que se corresponden a optimizaciones a nivel de compilación y ejecución así como también a un nuevo parseador SQL, para leer mas acerca de lo nuevo en Spark 2 clic aquí.

Aqui les dejo en enlace al proyecto donde ire añadiendo clases y seguiré probando mas cosas nuevas de Spark.

Estadística simple con Spark

Hace unos 20 días lei un articulo titulado «Simple Data Analysis Using Spark» (no publico el enlace ya que el mismo desapareció de dzone de la zona de Big Data y el mismo blog ya no existe, corroborarlo buscando por Internet), lo interesante de este articulo es que el autor hacía cálculos de estadística simple, es decir, calculaba, media, máximo y mínimo, y fue casualidad que justo en ese momento acaba de leer acerca de las operaciones numéricas de los RDD, es esta la razón que me movió a querer hacer un ejercicio similar, con estadística simple pero quería hacerlo con una información que yo considerara valiosa.

Fue entonces que me decidí a buscar alguna fuente de open data española, ya que quería arrojar u obtener datos reales, o no se quizás quería sencillamente obtener algo significativo que se acercara a una tarea real, fue así que llegue al Portal de datos abiertos de la comunidad de Madrid y después de revisar el catalogo me decidí por el padrón municipal, un fichero csv de 22 MB.

He aquí el código:

Hay una cosa que quiero resaltar, Lo simple y corto del código en Scala, tanto para la definición de la clase Padron como en las transformaciones hechas en los RDD, ya Scala esta en mi lista de cosas por aprender porque se que con Java hubiese sido el doble de código.

Por último les dejo parte de la información obtenida, la lista ordenada de población por distrito:

BARAJAS: 46166
VICALVARO: 69709
MORATALAZ: 94785
VILLA DE VALLECAS: 101011
MONCLOA-ARAVACA: 116581
RETIRO: 118390
CENTRO: 131805
USERA: 133579
CHAMBERI: 137454
VILLAVERDE: 141457
CHAMARTIN: 142334
SALAMANCA: 143123
ARGANZUELA: 151061
TETUAN: 152142
SAN BLAS-CANILLEJAS: 153303
HORTALEZA: 176320
CIUDAD LINEAL: 212626
PUENTE DE VALLECAS: 227183
LATINA: 234842
FUENCARRAL-EL PARDO: 234883
CARABANCHEL: 242032

¿Te ha parecido interesante? ¿Qué le agregarías o quitarías al código?

Si quieres acceder al GitHub donde he colgado el código pincha aquí