Apache Hadoop: Instalación y tutorial de MapReduce

Después de hacer una breve  introducción a Apache Hadoop vamos a mostrar cómo se instala en un sistema operativo GNU/Linux Ubuntu 12.10 para después poder ejecutar y crear un tutorial de MapReduce que vamos a desarrollar.

Para comenzar, descargamos la última versión estable de Apache Hadoop de su página oficial, seleccionando el archivo correspondiente a la arquitectura de nuestro procesador. Podemos elegir entre descargar un paquete instalable (rpm o deb según la distribución del sistema operativo) o bien un fichero comprimido para poder ubicarlo manualmente dónde deseemos.

En mi caso prefiero descargar el fichero comprimido “hadoop-stable.tar.gz”. Una vez descargado se descomprime:

$ tar xzf hadoop-stable.tar.gz

y posteriormente establecemos las variables de entorno (cambiar el directorio de instalación por el vuestro) para poder ejecutarlo desde cualquier ubicación:

$ export HADOOP_INSTALL=/home/$USER/hadoop-stable/

Chequeamos que esta todo correctamente comprobando la versión de hadoop instalada:

$ hadoop version
Hadoop stable
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1411108
Compiled by hortonfo on Mon Nov 19 10:48:11 UTC 2012
From source with checksum 9be520e845cf135867fb8b927a40affb

Dado que ya explicamos el concepto en que se basa el concepto MapReduce en el artículo de introducción a Apache Hadoop simplemente recordamos que es un paradigma o modelo de programación que consiste en paralelizar el procesado de los datos (divide los datos en fragmentos más pequeños , los procesa para obtener la información deseada y cuando todas las partes terminan de procesarse entonces reúne o junta toda la información extraída). La fase Map sería la encargada del filtrado y agrupación de la información y la fase Reduce se encargaría del cálculo o análisis de la información.

Apache Hadoop permite tres modos de funcionamiento:

  • Standalone: Usado únicamente para desarrollo. Usa una única máquina virtual java.
  • Pseudo distribuido: Simula un cluster en una máquina local.
  • Distribuido: Ejecución en un cluster real de máquinas.

El programa que vamos a desarrollar, ejecutado en modo Standalone, simplemente va a contar el total de las ocurrencias de cada palabra en todos los ficheros de entrada que le demos. Además, descartamos algunas palabras que no tienen significado alguno para el propósito del programa que hemos desarrollado, es decir, hemos determinado que palabras como “the” o “for” no tienen valor alguno. Adicionalmente también se eliminan los caracteres especiales.

Pasemos a ver las clases que se han creado junto con su código, para que podáis estudiarlo y probarlo.

Clase WordCounterMap.java: Es la encargada de limpiar cada línea de los ficheros y captura las ocurrencias de cada palabra. Para cada palabra se van almacenando las ocurrencias que se encuentran con un formato de clave-valor. Por ejemplo; (JOB,[1]), (JOBS, [1,1,1,1]), (EXIT, [1,1…N]).

package es.happyminds.hadoop.main;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCounterMap extends Mapper<LongWritable, Text, Text, LongWritable> {

	private static ArrayList discardWords = new ArrayList();

	/**
	 * Ignore several insignificant words
	 */
	static {
		discardWords.add("A");
		discardWords.add("AN");
		discardWords.add("ONE");
		discardWords.add("TO");
		discardWords.add("THE");
		discardWords.add("FROM");
		discardWords.add("FOR");
		discardWords.add("BY");
		discardWords.add("SINCE");
	}

	/**
	 * Process each line
	 */
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		// Clean the line deleting special and additional whitespace characters
		String line = value.toString().toUpperCase().replaceAll("[^a-zA-Z 0-9]+","").replaceAll("\\s+", " ");

		final List words = extractWordsFromLine(line);

		for (String word : words) {
			context.write(new Text(word), new LongWritable(1));
		}

	}

	private List extractWordsFromLine(String line) {

		if (line != null && !"".equals(line)) {
			return getWords(line);
		}		
		return Collections.emptyList();
	}

	private List getWords(String line) {

		final String[] words = line.split(" ");

		List finalWords = new ArrayList(words.length);
		for (String word : words) {
			if (!"".equals(word) && !discardWords.contains(word)) {
				finalWords.add(word);
			}
		}
		return finalWords;
	}
}

Clase WordCounterReduce.java: Simplemente cuenta las ocurrencias de cada palabra.

package es.happyminds.hadoop.main;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WorldCounterReduce extends Reducer<Text, LongWritable, Text, LongWritable> {	

	/**
	 * Count all occurrences of any word
	 */
	protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

		long wordOccurrences = 0;

		for (@SuppressWarnings("unused") LongWritable value : values) {
			wordOccurrences++;
		}

		context.write(key, new LongWritable(wordOccurrences));

	}
}

Y la clase principal WordCounter.java:

package es.happyminds.hadoop.main;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCounter {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		long startTime = System.currentTimeMillis();

		if (args.length != 2) {
			System.err.println("Use: WordCounter <SOURCE_PATH> <TARGET_PATH>");
			System.exit(1);
		}

		System.out.println("Start word counter program with these parameters: "
				+ Arrays.toString(args));

		String source = args[0];
		String target = args[1];

		boolean success = false;

		try {
			success = counterWords(source, target);
		} catch (Exception e) {
			System.err.println("Error counting words.");
			e.printStackTrace(System.err);
		}

		float executionTime = (System.currentTimeMillis() - startTime) / 1000;
		System.out.println("Execution time:" + executionTime + " seconds.");
		System.out.println("Finish word counter program. Success: " + success);

		System.exit(success ? 0 : 1);

	}

	private static boolean counterWords(String source, String target)
			throws IOException, ClassNotFoundException, InterruptedException {
		Job job = new Job();
		job.setJarByClass(WordCounter.class);
		FileInputFormat.addInputPath(job, new Path(source));
		FileOutputFormat.setOutputPath(job, new Path(target));
		job.setMapperClass(WordCounterMap.class);
		job.setReducerClass(WorldCounterReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		return job.waitForCompletion(true);
	}

}

En la clase principal se establece el rol de cada clase en el programa, los directorios de entrada y salida así como las clases de la pareja clave-valor para almacenar los resultados.

Para realizar la prueba de este programa nos situamos en el directorio raíz del proyecto y vamos a crear unas 10000 copias del archivo “/var/log/syslog” de nuestro SO dentro de una carpeta “input”:

$ mkdir input

$ for i in `seq 1 10000`; do cp /var/log/syslog input/syslog$i; done

Compilamos las clases java:

$ mkdir build

$ javac  -classpath lib/hadoop-core-stable.jar src/es/happyminds/hadoop/main/*.java -d build/

Establecemos el classpath de nuestras clases compiladas para que hadoop las encuentre (estamos ubicados en el directorio raíz del proyecto):

$  export HADOOP_CLASSPATH=./build

Y ejecutamos!

$ hadoop es.happyminds.hadoop.main.WordCounter ./input ./output

Nota: El directorio de salida “output” no debe existir de lo contrario se mostrará el correspondiente error.

Esperamos hasta que finalice el programa:

Execution time:307.0 seconds.
Finish word counter program. Success: true

Como se puede observar, ha tardado un total de 307 segundos. Hay que tener en cuenta que hadoop está pensado para tratar ficheros de gran tamaño (petabytes) sobre un filesystem creado para tratar este tipo de ficheros (HDFS, que lo veremos en el próximo tutorial) y, en este caso, ni el tamaño de los ficheros es el adecuado ni el filesystem el idóneo ni nada por el estilo, este ejemplo simplemente sirve para presentar el modo de operar con MapReduce.

Una vez que termine de ejecutarse el proceso podemos ver los resultados obtenidos en el fichero “output/part-r-00000”:

CD    30003
CMD    30003
CRON11079    10001
CRON7640    10001
CRON9661    10001
CRONDAILY    10001
CRONWEEKLY    30003
ETCCRONHOURLY    30003
EXIT    10001
JAN    80008
JOB    40004
JOBS    10001
NORMAL    10001
REPORT    30003

Este ejemplo de MapReduce es muy sencillo y de utilidad prácticamente nula en el entorno empresarial pero nos sirve para entender bastante bien el funcionamiento de esta apasionante tecnología. Sería más común buscar palabras concretas, con un formato determinado y extraer información adicional para cada palabra clave (errores, inicios de sesión, las veces que se menciona nuestra marca o empresa…).

Espero que os haya servido para tener una idea más clara y os anime a seguir profundizando en el mundo de Apache Hadoop.

Happy Minds!!!

No hay artículos relacionados.

Share on FacebookTweet about this on TwitterShare on LinkedInShare on RedditShare on Google+Digg thisShare on TumblrPin on PinterestBuffer this pagePrint this pageEmail this to someone