Apache Hadoop: Ejecutar MapReduce en HDFS

hadoopEn este tutorial se va a ver como ejecutar MapReduce en HDFS. Para comprender completamente este tutorial es necesario que le echéis un vistazo a los artículos y tutoriales anteriores dependiendo del nivel que tengáis en la materia: Introducción a Apache Hadoop, Instalación y tutorial de MapReduceIntroducción a HDFS. El código fuente del programa que se va a ejecutar se encuentra en el tutorial de MapReduce.

Apache HDFSAntes de comenzar se va a realizar un breve resumen de todo lo anterior para repasar los objetivos y conceptos básicos de Apache Hadoop: El objetivo principal es poder analizar grandes cantidades de información almacenadas en ficheros, bases de datos, etc… Para el caso de los ficheros (pueden tener tera/petabytes) se usa HDFS, un sistema de ficheros pensado para almacenar ficheros de este tipo gracias a su gran tamaño de bloque (64 MB) y está orientado principalmente para ejecutar procesos batch o lecturas de ficheros de tipo “write once and read many times” (ideal para MapReduce pero no para necesidades de baja latencia) en máquinas de capacidades limitadas. En un cluster HDFS se pueden tener dos tipos o roles de nodos, el Namenode (JobTracker) para la gestión y control y otros que serán meros ejecutores Datanodes (TaskTracker).

En este caso vamos a realizar una ejecución en modo pseudo-distribuido (simula un cluster HDFS, con varios nodos, en una máquina local). Para ello, se deben realizar una serie de configuraciones y pasos adicionales que se van a ver a continuación.

Configuración del cluster HDFS

Se va a explicar tanto para versiones con MapReduce tradicional (MRv1) tanto como para YARN (MRv2), una versión mejorada de MapReduce que está presente en versiones 0.23.X.

Todos los ficheros de configración se encuentran ubicados en el directorio “conf” de la distribución Hadoop instalada en la máquina dónde se va a realizar la ejecución. En nuestro caso lo tenemos instalado de tutoriales anteriores en la siguiente ruta (usad vuestra ruta si es necesario):

/home/$USER/hadoop-stable/

Por tanto, los ficheros de configuración de Hadoop estarán en la ruta:

Versiones 1.X.X:

 /home/$USER/hadoop-stable/conf

Versiones 0.23.X:

 /home/$USER/hadoop-stable/etc/hadoop

El primer fichero se va a editar se llama “hadoop-env.sh”. Este fichero contiene las variables de entorno que van a ser requeridas. Aquí especificamos nuestro JAVA_HOME según la ruta dónde se tenga instalado el Java que se desea usar (se pueden tener varios instalados).

Buscamos las siguiente líneas:

# The java implementation to use.  Required.
# export JAVA_HOME=/usr/lib/j2sdk1.5-sun

Descomentamos el export eliminando el carácter # de comienzo de línea y se reemplaza la ruta por la de nuestra instalación de Java. En mi caso que sería:

# The java implementation to use.  Required.
export JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64

En la versión 0.23.X el archivo es “libexec/hadoop-config.sh” que intenta cargar el JAVA_HOME por defecto. Sino se tiene como variable del sistema poner el “export” justo antes de la comprobación.

Se guarda y se continua con el fichero “core-site.xml”. En este fichero se especifican las propiedades para definir el entorno: Ruta del namenode o sistema de ficheros por defecto, tamaño del buffer de lectura, directorio temporal, etc..

Sólo se va a usar la propiedad fs.default.name para indicar el sistema de ficheros por defecto cuyo valor es hdfs://localhost/  (sistema de ficheros “hdfs”, ruta del namenode (JobTracker) “localhost” y puerto 9000 (8020 por defecto). El fichero debe quedar así:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
         <name>fs.default.name</name>
         <value>hdfs://localhost:9000</value>
    </property>
</configuration>

El siguiente fichero que se va a editar es “hdfs-site.xml” para establecer el nivel de replicación de los bloques de datos para la tolerancia a fallos. El nivel por defecto es tres pero como sólo vamos a tener un nodo hay que sobrescribirlo.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
 <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

Ahora configuramos el datanode (TaskTracker) en el fichero “mapred-site.xml”:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
     <property>
         <name>mapred.job.tracker</name>
         <value>localhost:9001</value>
     </property>
</configuration>

Con esto ya se tiene la configuración del modo pseudo-distribuido que se va a usar.

Habilitar SSH

Ahora, hay que habilitar en nuestro ordenador la conexión mediante ssh a el filesystem HDFS definido para que Hadoop pueda inicializar los nodos. Además, se va a configurar para permitir la conexión  sin contraseña.

Primero, se tiene que tener instalado un servidor SSH, en mi caso:

sudo apt-get install ssh
sudo apt-get install rsync

Una vez instalado, probamos la conexión:

ssh localhost

The authenticity of host 'localhost (127.0.0.1)' can't be established.
ECDSA key fingerprint is 85:de:e7:cc:57:8c:2b:b8:0...
Are you sure you want to continue connecting (yes/no)?

Se responde yes a la pregunta y tras insertar la contraseña que solicita se debe realizar la conexión. Es muy probable que os pida reiniciar el sistema operativo.

El siguiente paso es permitir el acceso SSH sin necesidad de indicar el usuario y la contraseña. Para ello, primeramente se aconseja realizar un backup de las claves si ya se tienen algunas creadas (acceso a git por ejemplo). Para permitir el acceso sin inicio de sesión ejecutamos los siguientes comandos:

ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Se prueba la conexión sin contraseña:

ssh localhost

y debe conectarse sin solicitar ningún tipo de usuario o contraseña. Salimos de esa sesión con “exit”.

Puesta en marcha y prueba de HDFS

El primer paso es darle formato al filesystem HDFS:

hadoop namenode -format

A continuación se realiza el arranque de los nodos:

/home/$USER/hadoop-stable/bin/start-all.sh

Que se puede hacer por separado según el punto del proceso en el que se esté o lo que se desee realizar:

/home/$USER/hadoop-stable/bin/start-dfs.sh
/home/$USER/hadoop-stable/bin/start-mapred.sh

En la versión 0.23.X este comando está obsoleto por lo que se ejecutará por separado cambiando “start-mapred.sh” por “start-yarn.sh” y el directorio es “sbin” en vez de “bin”.

Si todo ha ido bien aparecerá información sobre el arranque:

starting namenode, logging to /home/$USER/hadoop-stable/libexec/../logs/hadoop-$USER-namenode-$USER-laptop.out
localhost: starting datanode, logging to /home/$USER/hadoop-stable/libexec/../logs/hadoop-$USER-datanode-$USER-laptop.out
localhost: starting secondarynamenode, logging to /home/$USER/hadoop-stable/libexec/../logs/hadoop-$USER-secondarynamenode-$USER-laptop.out

Se verifica que las tareas “hadoop” están corriendo con el comando:

jps

Cuyo resultado es:

3914 TaskTracker
3679 JobTracker
3799 Jps
3590 SecondaryNameNode
3349 DataNode

y en versiones 0.23.X:

10117 NodeManager
9501 DataNode
10182 Jps
9903 ResourceManager
9762 SecondaryNameNode

Se puede ver un resumen del cluster en el panel del namenode: http://localhost:50070/dfshealth.jsp

El siguiente paso es verificar que todo funciona ejecutando uno de los ejemplos que trae la distribución. Copiamos los ficheros (ubicados en el path de hadoop):

 hadoop fs -put conf input

Ejecutamos el ejemplo para versiones anteriores a 0.23:

hadoop jar hadoop-examples-*.jar grep input output 'dfs[a-z.]+'

en otro caso:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep input output 'dfs[a-z.]+'

Se pueden copiar los ficheros de salida del proceso a una carpeta “output_example” local para poder visualizarlos:

hadoop fs -get output/* output_example

También pueden ser vistos desde el filesystem:

hadoop fs -cat output/*

Preparación de la ejecución

Ya está todo configurado y casi preparado para realizar la ejecución del programa creado en tutoriales anteriores (realiza un simple conteo de palabras). Antes, se tienen que copiar los ficheros de entrada al filesystem.

Para ello se tiene que, en primer lugar, copiar los ficheros de información que fueron creados para usar como entrada del programa. Las ordenes van a ser: crear la carpeta “wordcount”, copiar los ficheros de local a HDFS y comprobar que se han copiado correctamente:

hadoop fs -mkdir wordcount
hadoop fs -put /home/$USER/workspace/hadoop/apache-hadoop/input/* wordcount
hadoop fs -ls wordcount/

En mi caso tengo los ficheros en el directorio “/home/$USER/workspace/hadoop/apache-hadoop/input/” de los tutoriales anteriores. El proceso puede tardar un poco dependiendo del tamaño total de los ficheros. En la comprobación se muestra en la primera línea el total de ficheros que contiene.

En algunos otros ejemplos veréis que se usa -copyFromLocal en vez de -put, en otro artículo explicaremos la diferencia (mejor -put, es más corto 😉 ). De igual modo, podéis encontrar el uso de “hdfs dfs” en vez de “hadoop fs” e incluso “hadoop dfs” (deprecated).

MapReduce en cluster HDFS

Cuando se ejecuta un Job MapReduce en cluster (o pseudo-cluster que a efectos prácticos es lo mismo) se desencadena la siguiente serie de acciones (adictos al trabajo):

  1. El “Job” es lanzado desde una aplicación cliente.
  2. JobClient solicita al JobTracker un identificador de Job.
  3. JobClient procesa los datos de entrada y los divide en fragmentos de tamaño fijo.JobClient copia los recursos necesarios para ejecutar el programa al sistema de ficheros (HDFS), incluído el ejecutable o programa “jar” con la lógica de negocio, así como los fragmentos de datos.
  4. JobClient notifica a JobTracker que el tabajo está preparado para ser ejecutado.
  5. El trabajo es encolado por JobTracker.
  6. El Job Scheduler recoge el trabajo y lo inicializa. Para ello, crea un “Map Task” por cada fragmento de datos.
  7. El número de “Reduce Task” viene dado por el valor de la propiedad “mapred.reduce.tasks”. A cada tarea se le asigna un identificador.
  8. Los TaskTrackers se “enbuclan” notificando periódicamente su estado al JobTracker (heartbeat) y su disponibilidad para correr tareas. Cuando un TaskTracker está disponible, el JobTracker le asigna una tarea para ejecutar.
  9. Para correr una tarea, el TaskTracker localiza el ejecutable (jar) y los ficheros necesarios para la ejecución en el sistema distribuido de ficheros, los copia en local, descomprime el jar y crea un TaskRunner. Cada TaskRunner lanza su propia JVM, y comunica al TraskTracker el progreso de la tarea periódicamente.
  10. Cuando el JobTracker es notificado de que la última tarea ha sido completada, cambia el estado del trabajo a “successful”, notifica su finalización (si se ha configurado para ello en “job.end.notification.url”) y libera los recursos utilizados.

Ejecutar MapReduce en HDFS

Para realizar la ejecución del programa contador de palabras que se realizó previamente, es necesario generar un ejecutable o programa “jar” que contenga dicha lógica. Desde eclipse simplemente se tiene que seleccionar el proyecto, botón derecho y seleccionar la opción “Export…”. A continuación, se abre una ventana en la se debe buscar “jar” y seleccionar la opción “Jar file”.

Una vez realizado lo anterior sale una nueva ventana con el proyecto seleccionado. Se despliega y se desmarcan los directorios que no nos interesan (input y output por ejemplo). Se indica la ruta y nombre del “jar” y se pulsa el botón finalizar.

En mi caso he creado una carpeta llamada “hadoop_apps” en mi home de usuario y he guardado ahí el ejecutable bajo el nombre del “wordcount_hadoop.jar”

Y ahora, por fin se puede ejecutar el programa:

hadoop jar ../hadoop_apps/wordcount_hadoop.jar es.happyminds.hadoop.main.WordCounter wordcount file:///home/$USER/hadoop-stable/wordcount/resultado/

Siendo los parámetros del ejecutable los siguientes:

  1. La clase a ejecutar.
  2. El directorio de entrada en el HDFS: wordcount.
  3. El directorio de salida: file:///home/$USER/hadoop-stable/wordcount/resultado/. Se le indica file:// para que no guarde la salida en el HDFS sino en local. El directorio “resultado” no debe existir y su padre, “wordcount”, debe existir y tener permisos de escritura para el usuario en ejecución.

Una vez ha terminado se puede ver la salida (listo un fragmento) en el fichero “…/resultado/part-r-00000”:

EXIT	4445
JAN	35560
JOB	17780
JOBS	4445
NORMAL	4445
REPORT	13335
ROOT	13335
RUN	4445
RUNPARTS	13335
STARTED	4445
TERMINATED	8890
TIMESTAMP	4445

Para finalizar se paran los nodos:

 /home/jdzuri/hadoop-stable/bin/stop-all.sh

Al igual que el arranque, también se puede hacer por separado:

 /home/jdzuri/hadoop-stable/bin/stop-dfs.sh
 /home/jdzuri/hadoop-stable/bin/stop-mapred.sh

E igual que pasa con el arranque, para versiones 0.23.X:

 /home/jdzuri/hadoop-stable/sbin/stop-dfs.sh
 /home/jdzuri/hadoop-stable/sbin/stop-yarn.sh

Conclusiones

Este proceso ha tardado 121 segundos mientras que en modo singleton, tal y como se ejecutó en el tutorial de MapReduce, ha tardado 110 segundos (para los mismos ficheros). Sin embargo, como en aquel tutorial os insto a que no os alarméis de manera precipitada ya que tiene una explicación bastante lógica que de este modo tarde algo más. Esto es debido a varios factores importantes:

  1. Los ficheros que se tienen para trabajar con HDFS no son óptimos: Se tiene gran cantidad de ficheros de un tamaño no muy grande. Esto penaliza todo el proceso de fragmentación de los datos y lectura debido al tamaño de bloque de HDFS (fragmentos menores de 64 MB). Para el tamaño de ficheros para los que está pensada esta tecnología (tera o petabytes incluso) ejecutarlos en una sola máquina sería inviable.
  2. Modo pseudo-distribuido en local: En el modo singleton sólo se ejecutaba hadoop sobre local. Ahora se tiene adicionalmente, sobre la misma máquina, un TaskTracker y un JobTracker independientes, leyendo datos de forma remota y mediante transferencias ssh (operaciones IO más lentas).

Del mismo modo, las verdaderas ventajas de hadoop se ven con varios cluster de nodos en distintos servidores para que realmente sean operaciones distribuidas. En estos ejemplos no dejan de ser secuenciales y su aplicación es meramente para entornos de desarrollo.

En alguno de los próximos tutoriales sobre Apache Hadoop – HDFS se ejecutará este mismo programa en un entorno algo más cercano al real, en modo distribuido con varios cluster y distintos servidores.

Como siempre, espero que os haya resultado interesante y de mucha ayuda.

Happy Minds!!!

Share on FacebookTweet about this on TwitterShare on LinkedInShare on RedditShare on Google+Digg thisShare on TumblrPin on PinterestBuffer this pagePrint this pageEmail this to someone