Apache Storm es un framework de procesamiento distribuido de eventos. Empresas como twitter utilizaron Storm desde el 2011 aunque posteriormente lo reemplazó por Heron en el 2015.
Actualmente me encuentro trabajando en una aplicación construida con Apache Storm y quiero compartir con ustedes mi primer ejemplo con Apache Storm el cual es bastante simple pero cumplio su cometido que era el iniciarme en este framework y entender sus componentes principales.
Necesitaremos descargar rabbitMQ la versión 3.5.7.
Una vez hayamos instalado rabbitMQ (es solo cuestión de descomprimir) vamos a habilitar un plugin (un cliente web) donde podremos de forma sencilla monitorizar las colas, entonces nos ubicamos en la siguiente ruta ruta_instalacion_rabbitmq/sbin/ y desde allí ejecutamos el siguiente comando
./rabbitmq-plugins enable rabbitmq_management
Este comando habilitará el cliente web que nos permitirá monitorizar las colas, los exchanges incluso manipular las colas pudiendo ingresar elementos a las colas, sacar elementos e incluso purgar las colas. Paso siguiente iniciaremos el rabbitMQ con el siguiente comando:
./rabbitmq-server
Inmediatamente después desde un navegador nos dirigimos a la dirección http://localhost:15672
Una vez allí crearemos una cola para hacer nuestro ejemplo y la llamaremos «data».
Ahora deberíamos ser capaces de ver la única cola de nuestro sistema. Clicando en ella podríamos incluso agregarle mensajes por medio de la interfaz gráfica (si lo desean hagan la prueba y verán como cada uno de los mensajes que agreguen se irán encolando), pero la inserción de mensajes en la cola lo haremos mediante un pequeño programa Java.
Ahora un poco de teoría para conocer acerca de Apache Storm.
¿Qué es Apache Storm?
Es un framework de computación distribuida en tiempo real, escrito en su mayoría en Clojure. Storm es similar a la forma como Hadoop ofrece un conjunto de primitivas generales para hacer el procesamiento por lotes, también ofrece un conjunto de primitivas generales para hacer cómputos en tiempo real. Storm es simple, se puede utilizar con cualquier lenguaje de programación.
Las aplicaciones de Storm son creadas como topologías en la forma de DAG (Directed Acyclic Graph) con spouts y bolts actuando como los vertices del grado. Las aristas en el grafo son llamados streams y dirigen la data de un nodo a otro. Juntos, la topología actúa como una tubería de transformación de datos.
Los spouts son fuentes de flujo (streams) en una topología. Los spouts generalmente leerán tuplas desde una fuente externa y las emiten dentro de la topología.
Un bolt es donde se realiza todo el procesamiento de una topología, pueden hacer cualquier cosa, desde filtrado, funciones, agregaciones, joins, comunicarse con bases de datos y mucho más.
El ejemplo que hice y compartiré a continuación con ustedes será bastante simple, estará constituido por un spout que leerá de una cola rabbitmq (la cola data que creamos anteriormente) y ese mensaje lo insertará en la topología para posteriormente al recibirlo el bolt mostrarlo por linea de comandos (ya luego si ustedes lo desean lo que podrían hacer es que en vez de mostrarlo por linea de comandos volcar ese mensaje en otra cola de rabbitmq).
El programa java que se encarga de insertar mensajes a la cola
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package com.josedeveloper.rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SendMessagesToRabbit { public static final String message = "MENSAJE DE PRUEBA"; public static final int NUM_MENSAJES = 10000; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { sendMessage(); } public static void sendMessage() throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); //usuario por defecto de rabbitmq factory.setPassword("guest"); //password por defecto de rabbitmq factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); for (int i = 0; i < NUM_MENSAJES; i++) { final String msg = message + i; channel.basicPublish("", "data", null, msg.getBytes()); } channel.close(); connection.close(); } } |
El Spout que leerá de la cola rabbitMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
package com.josedeveloper.topologia; import java.io.IOException; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitMQSpout extends BaseRichSpout { private static final long serialVersionUID = -5875062340173997062L; private SpoutOutputCollector collector; BlockingQueue messages; private final static String QUEUE_NAME = "data"; @Override public void nextTuple() { String message; while ((message = messages.poll()) != null) { collector.emit(new Values(message)); //emitimos el mensaje dentro de la topologia } } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; messages = new ArrayBlockingQueue(100); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { messages.put(message); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e1) { e1.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "message" ) ); //declaramos los campos que enviaremos a la topologia } } |
El bolt que leerá los datos que han sido insertados en la topología por el Spout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package com.josedeveloper.topologia; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class MessageBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; @SuppressWarnings("unused") private OutputCollector collector; @Override public void execute(Tuple tuple) { String message = tuple.getString(0); System.out.println("--> " + message); } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } } |
Definición de la topología
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
package com.josedeveloper.topologia; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class RabbitMQTopologyExample { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RabbitMQSpout()); builder.setBolt("bolt", new MessageBolt()) .shuffleGrouping("spout"); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(200000); cluster.killTopology("test"); cluster.shutdown(); } } |
Para ejecutar nuestro ejemplo y verlo funcionando debemos ejecutar 2 clases (el orden sería indistinto):
- SendMessagesToRabbit
- RabbitMQTopologyExample
Al ejecutar la clase SendMessagesToRabbit, podremos ver en el cliente Web de RabbitMQ como la cola tendrá 10000 mensajes encolados. Al ejecutar la topología (ejecutando la clase RabbitMQTopologyExample) podremos ver como los mensajes se van desencolando y a su vez por linea de comandos (por ejemplo de nuestro editor) veremos los mensajes que en teoría el bolt leyó y procesó.
Espero que les sea de utilidad y disfruten con este framework, desde mi punto de vista es sencillo y funciona bien, incluso la nueva herramienta que utiliza Twitter posee retrocompatibilidad con Storm por lo cual se podría empezar con un ejemplo de este tipo.
Clic aquí para ir al repositorio github.