Apache Hadoop: Reducir el consumo de ancho de banda usando Combiner

Apache Hadoop logoEn anteriores publicaciones del tutorial de Apache Hadoop que se está realizando, así como en la gran mayoría artículos que se pueden encontrar navegando por la red, se menciona la gran cantidad de ventajas y bondades que presenta esta tecnología, sin embargo, también presenta algunos inconvenientes que deben ser tratados o solventados por los desarrolladores en la medida de lo posible. En este artículo se va ver cómo reducir el consumo de ancho de banda usando Combiner.

Alto consumo de ancho de banda

Uno de los inconvenientes más notables que puede presentar el uso de MapReduce es el alto consumo de ancho de banda o tráfico de red cuando se manejan o tratan gran cantidad de datos. Este problema es causa directa de la arquitectura que presenta MapReduce al realizar la transferencia los datos analizados, por los nodos (datanodes) encargados de dicho análisis (Map), a los nodos encargados de realizar la función de agrupamiento o recolección y tratamiento final de los resultados obtenidos (Reduce).

Combiner

El Combiner es un paso intermedio en el flujo de datos MapReduce el cual es ejecutado, de forma totalmente opcional, entre el Mapper y el Reducer. Al ser un paso opcional, éste debe ser implementado y activado de forma explícita por los desarrolladores en caso de que sea necesario reducir el consumo de ancho de banda de la aplicación.

mapreduce architectureEl Combiner será ejecutado una vez por cada una de las tareas Map (por cada nodo) y justo después de la ejecucción de esta. Para ello, el Combiner de cada nodo recibirá como datos de entrada todos los datos emitidos por el Mapper de dicho nodo y la salida de éste Combiner es la que será enviada a los nodos encargados de realizar la tarea Reducer.

En resumen, el Combiner se puede definir como un pequeño proceso Reducer (mini-reducer) que opera sólamente con los datos generados por el Mapper de un nodo (habrá tantos Combiners como nodos).

Usando el sencillo ejemplo “contador de palabras” que se implementó en el artículo de instalación y tutorial de MapReduce se va a ver cómo es muy útil el uso del Combiner.

Este programa emite un par (word, 1) para cada instancia de cada palabra que encuentra por lo que si el mismo documento contiene la palabra “REPORT” diez veces, el par (“REPORT”, 1) se emite diez veces y, todas y cada una de ellas, son enviadas al Reducer. En este punto es dónde entra en acción el Combiner, ya que haciendo uso de él se puede realizar un agrupamiento previo de modo que cada nodo sólo enviaría un único par (“REPORT”, 10) al Reducer.

Como se puede intuir de forma sencilla, el hecho de que cada nodo sólo envíe un valor por cada palabra reduce drásticamente el uso del ancho de banda total requerido para el proceso “shuffle” y acelera el trabajo.

¿Cómo implementar el Combiner? La mejor parte de todo es que no es necesario escribir ningún código adicional para hacer uso del Combiner ya que, si la función Reduce es conmutativa y asociativa (en este ejemplo lo es), puede ser usada como Combiner sin ningún tipo de problema.

Para habilitar el Combiner, en el ejemplo que se ha mencionado por ejemplo, tan sólo bastaría con añadir la siguiente línea al programa principal “WordCounter.java”:

job.setCombinerClass(WorldCounterReduce.class);

El código completo de la clase “WordCounter.java” quedaría como sigue:

package es.happyminds.hadoop.main;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCounter {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		long startTime = System.currentTimeMillis();

		if (args.length != 2) {
			System.err.println("Use: WordCounter <SOURCE_PATH> <TARGET_PATH>");
			System.exit(1);
		}

		System.out.println("Start word counter program with these parameters: "
				+ Arrays.toString(args));

		String source = args[0];
		String target = args[1];

		boolean success = false;

		try {
			success = counterWords(source, target);
		} catch (Exception e) {
			System.err.println("Error counting words.");
			e.printStackTrace(System.err);
		}

		float executionTime = (System.currentTimeMillis() - startTime) / 1000;
		System.out.println("Execution time:" + executionTime + " seconds.");
		System.out.println("Finish word counter program. Success: " + success);

		System.exit(success ? 0 : 1);

	}

	private static boolean counterWords(String source, String target)
			throws IOException, ClassNotFoundException, InterruptedException {
		Job job = new Job();
		job.setJarByClass(WordCounter.class);
		FileInputFormat.addInputPath(job, new Path(source));
		FileOutputFormat.setOutputPath(job, new Path(target));
		job.setMapperClass(WordCounterMap.class);
		<strong>job.setCombinerClass(WorldCounterReduce.class);</strong>
		job.setReducerClass(WorldCounterReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		return job.waitForCompletion(true);
	}
}

Conclusión

Como se puede apreciar tras leer este artículo es realmente sencillo reducir el consumo de ancho de banda, usando Combiner, de una aplicación MapReduce si se tiene un Reducer implementado de forma correcta. Además de esto, se puede reducir aún más el tráfico de red de una aplicación Hadoop habilitando la compresión de los datos aunque esto se verá en un artículo independiente.

Happy Minds!!!

Share on FacebookTweet about this on TwitterShare on LinkedInShare on RedditShare on Google+Digg thisShare on TumblrPin on PinterestBuffer this pagePrint this pageEmail this to someone