Apache Hadoop: Crear aplicaciones YARN. Cliente y Master básicos

crear aplicaciones YARNEn este tutorial se va continuar aprendiendo cómo crear aplicaciones YARN. En el artículo anterior, crear aplicaciones YARN 1/3, se hizo una explicación del funcionamiento general, de modo teórico, de YARN así como de los elementos que intervienen en su uso.

En este, se va a continuar exponiendo el código fuente, de forma general, para llevar a cabo las operaciones y comunicaciones necesarias para que el cliente y el ApplicationMaster interactuen entre ellos y puedan llegar a ejecutar una aplicación. Todo el código estará debidamente explicado y comentado para hacer su entendimiento más fácil.

Crear aplicaciones YARN: Cliente y ApplicationMaster

Cliente

El primer paso que el cliente necesita hacer es conectarse con el RM o, para ser más específicos, con la interfaz ApplicationsManager (ASM) del RM.

    ClientRMProtocol applicationsManager; 
    YarnConfiguration yarnConf = new YarnConfiguration(conf);
    InetSocketAddress rmAddress = 
        NetUtils.createSocketAddr(yarnConf.get(
            YarnConfiguration.RM_ADDRESS,
            YarnConfiguration.DEFAULT_RM_ADDRESS));             
    LOG.info("Connecting to ResourceManager at " + rmAddress);
    configuration appsManagerServerConf = new Configuration(conf);
    appsManagerServerConf.setClass(
        YarnConfiguration.YARN_SECURITY_INFO,
        ClientRMSecurityInfo.class, SecurityInfo.class);
    applicationsManager = ((ClientRMProtocol) rpc.getProxy(
        ClientRMProtocol.class, rmAddress, appsManagerServerConf));

Después de esto, el cliente tiene que solicitar un nuevo identificador de aplicación (ApplicationId) al RM

   GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);              
    GetNewApplicationResponse response = 
        applicationsManager.getNewApplication(request);
    LOG.info("Got new ApplicationId=" + response.getApplicationId());

La respuesta desde el ASM contiene además información sobre el cluster como los recursos mínimos y máximos que posee. Esta información se requiere para poder asegurar que el contenedor de la aplicación puede cargarse correctamente y sin problemas por falta de recursos.

La clave principal de un cliente es configurar el ApplicationSubmissionContext que define toda la información que necesita el ResourceManager para cargar el ApplicationMaster. Un cliente necesita establecer en el contexto la siguiente información:

  • Información de la aplicación: ID y nombre.
  • Cola y prioridad: La cola de ejecución a la que se va a enviar la aplicación así como su prioridad.
  • Usuario: El usuario que envía la solicitud.
  • ContainerLaunchContext: La información que define el contenedor en el que se puso en marcha el ApplicationMaster (binarios, ficheros, tokens de seguridad, variables de entorno como el CLASSPATH y el comando a ejecutar entre otras cosas).
 // Create a new ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = 
        Records.newRecord(ApplicationSubmissionContext.class);
    // set the ApplicationId 
    appContext.setApplicationId(appId);
    // set the application name
    appContext.setApplicationName(appName);

    // Create a new container launch context for the AM's container
    ContainerLaunchContext amContainer = 
        Records.newRecord(ContainerLaunchContext.class);

    // Define the local resources required 
    Map<String, LocalResource> localResources = 
        new HashMap<String, LocalResource>();
    // Lets assume the jar we need for our ApplicationMaster is available in 
    // HDFS at a certain known path to us and we want to make it available to
    // the ApplicationMaster in the launched container 
    Path jarPath; // <- known path to jar file  
    FileStatus jarStatus = fs.getFileStatus(jarPath);
    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
    // Set the type of resource - file or archive
    // archives are untarred at the destination by the framework
    amJarRsrc.setType(LocalResourceType.FILE);
    // Set visibility of the resource 
    // Setting to most private option i.e. this file will only 
    // be visible to this instance of the running application
    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);          
    // Set the location of resource to be copied over into the 
    // working directory
    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); 
    // Set timestamp and length of file so that the framework 
    // can do basic sanity checks for the local resource 
    // after it has been copied over to ensure it is the same 
    // resource the client intended to use with the application
    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
    amJarRsrc.setSize(jarStatus.getLen());
    // The framework will create a symlink called AppMaster.jar in the 
    // working directory that will be linked back to the actual file. 
    // The ApplicationMaster, if needs to reference the jar file, would 
    // need to use the symlink filename.  
    localResources.put("AppMaster.jar",  amJarRsrc);    
    // Set the local resources into the launch context    
    amContainer.setLocalResources(localResources);

    // Set up the environment needed for the launch context
    Map<String, String> env = new HashMap<String, String>();    
    // For example, we could setup the classpath needed.
    // Assuming our classes or jars are available as local resources in the
    // working directory from which the command will be run, we need to append
    // "." to the path. 
    // By default, all the hadoop specific classpaths will already be available 
    // in $CLASSPATH, so we should be careful not to overwrite it.   
    String classPathEnv = "$CLASSPATH:./*:";    
    env.put("CLASSPATH", classPathEnv);
    amContainer.setEnvironment(env);

    // Construct the command to be executed on the launched container 
    String command = 
        "${JAVA_HOME}" + /bin/java" +
        " MyAppMaster" + 
        " arg1 arg2 arg3" + 
        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";                     

    List<String> commands = new ArrayList<String>();
    commands.add(command);
    // add additional commands if needed                

    // Set the command array into the container spec
    amContainer.setCommands(commands);

    // Define the resource requirements for the container
    // For now, YARN only supports memory so we set the memory 
    // requirements. 
    // If the process takes more than its allocated memory, it will 
    // be killed by the framework. 
    // Memory being requested for should be less than max capability 
    // of the cluster and all asks should be a multiple of the min capability. 
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(amMemory);
    amContainer.setResource(capability);

    // Set the container launch content into the ApplicationSubmissionContext
    appContext.setAMContainerSpec(amContainer);

Después de que el proceso de instalación se haya completado, el cliente envía la aplicación al ASM.

    // Create the request to send to the ApplicationsManager 
    SubmitApplicationRequest appRequest = 
        Records.newRecord(SubmitApplicationRequest.class);
    appRequest.setApplicationSubmissionContext(appContext);

    // Submit the application to the ApplicationsManager
    // Ignore the response as either a valid response object is returned on 
    // success or an exception thrown to denote the failure
    applicationsManager.submitApplication(appRequest);

En este punto, el ResourceManager ha aceptado la aplicación y continuará con el proceso de asignación del contenedor con las especificaciones requeridas y cargando el  ApplicationMaster en dicho contenedor.

Ahora, el cliente puede solicitar al RM un informe sobre el estado de la aplicación haciendo uso de ClientRMProtocol#getApplicationReport. Hay que recordar que si el AM lo permite se puede solicitar dicha información directamente a él.

      GetApplicationReportRequest reportRequest = 
          Records.newRecord(GetApplicationReportRequest.class);
      reportRequest.setApplicationId(appId);
      GetApplicationReportResponse reportResponse = 
          applicationsManager.getApplicationReport(reportRequest);
      ApplicationReport report = reportResponse.getApplicationReport();

El ApplicationReport recibido del ResourceManager contiene los siguientes datos:

  • Información general de aplicación: ApplicationId, cola a la que se envió la aplicación, el usuario y la hora de inicio de la aplicación.
  • Detalles del ApplicationMaster: El servidor en el que el AM está funcionando, el puerto RPC (si los hay) en los que se escucha las peticiones de los clientes y las claves o tokens que el cliente necesita para comunicarse con el AM.
  • Seguimiento de la aplicación: Si la aplicación admite algún tipo de seguimiento de los progresos, el cliente puede monitorizar el proceso mediante una URL de seguimiento y haciendo uso de ApplicationReport#getTrackingUrl.
  • ApplicationStatus: El estado de la aplicación desde el RM mediante ApplicationReport#getYarnApplicationState. Si el YarnApplicationState se establece terminado (FINISHED), el cliente debe comprobar si ha finalizado con éxito o con error (ApplicationReport#getFinalApplicationStatus). En caso de error, se puede obtener información adicional sobre el fallo mediante ApplicationReport#getDiagnostics.

Es posible que en ciertas situaciones el cliente decida matar la aplicación. En este caso se puede hacer uso de ClientRMProtocol#forceKillApplication para enviar una señal kill al AM a través del RM (también se podría hacer directamente sobre AM si se implementa de ese modo).

    KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);                
    killRequest.setApplicationId(appId);
    applicationsManager.forceKillApplication(killRequest);

ApplicationMaster

ApplicationMaster es el propietario de la tarea (Job). Será cargado por el ResourceManager y, a través del cliente, se le proporcionará toda la información y recursos necesarios para supervisar y completar el Job. El AM puede compartir un mismo servidor físico con otros contenedores de aplicaciones y, cuando se levanta el AM, tiene a su disposición diversa información a través del entorno.

Todas las interacciones con el ResourceManager requieren un ApplicationAttemptId (puede haber varios intentos por solicitud en caso de fallos). El ApplicationAttemptId se puede obtener del contenedor del AM.

 Map<String, String> envs = System.getenv();
    String containerIdString = 
        envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
    if (containerIdString == null) {
      // container id should always be set in the env by the framework 
      throw new IllegalArgumentException(
          "ContainerId not set in the environment");
    }
    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();

Una vez se ha incializado completamente el ApplicationMaster, necesita registrarse con el ResourceManager vía AMRMProtocol#registerApplicationMaster. El AM siempre se comunica con el RM mediante el Scheduler.

// Connect to the Scheduler of the ResourceManager. 
    YarnConfiguration yarnConf = new YarnConfiguration(conf);
    InetSocketAddress rmAddress = 
        NetUtils.createSocketAddr(yarnConf.get(
            YarnConfiguration.RM_SCHEDULER_ADDRESS,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));           
    LOG.info("Connecting to ResourceManager at " + rmAddress);
    AMRMProtocol resourceManager = 
        (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);

    // Register the AM with the RM
    // Set the required info into the registration request: 
    // ApplicationAttemptId, 
    // host on which the app master is running
    // rpc port on which the app master accepts requests from the client 
    // tracking url for the client to track app master progress
    RegisterApplicationMasterRequest appMasterRequest = 
        Records.newRecord(RegisterApplicationMasterRequest.class);
    appMasterRequest.setApplicationAttemptId(appAttemptID);     
    appMasterRequest.setHost(appMasterHostname);
    appMasterRequest.setRpcPort(appMasterRpcPort);
    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);

    // The registration response is useful as it provides information about the 
    // cluster. 
    // Similar to the GetNewApplicationResponse in the client, it provides 
    // information about the min/mx resource capabilities of the cluster that 
    // would be needed by the ApplicationMaster when requesting for containers.
    RegisterApplicationMasterResponse response = 
        resourceManager.registerApplicationMaster(appMasterRequest);

El AM tiene que notificar periódicamente al RM que continúa vivo y ejecutándose correctamente. El timeout está definido en la configuración y se puede acceder a él mediante la variable YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS y su valor por defecto esta definido en YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS.

En base a los requerimientos de la tarea, el ApplicationMaster puede solicitar un conjunto de contenedores para ejecutarla. Debe utilizar la clase ResourceRequest para definir las especificaciones del contenedor:

  • Nombre del servidor: Si los contenedores deben ser alojados en un determinado servidor o en cualquiera (*) .
  • Recursos: Actualmente sólo se puede especificar los requisitos de memoria (MB).
  • Prioridad: Al solicitar contenedores se pueden especificar las prioridades. De este modo, el AM puede asignar una prioridad mayor a los contenedores encargados de la función Map y menor a los de Reduce.
// Resource Request
    ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);

    // setup requirements for hosts 
    // whether a particular rack/host is needed 
    // useful for applications that are sensitive
    // to data locality 
    rsrcRequest.setHostName("*");

    // set the priority for the request
    Priority pri = Records.newRecord(Priority.class);
    pri.setPriority(requestPriority);
    rsrcRequest.setPriority(pri);           

    // Set up resource type requirements
    // For now, only memory is supported so we set memory requirements
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(containerMemory);
    rsrcRequest.setCapability(capability);

    // set no. of containers needed
    // matching the specifications
    rsrcRequest.setNumContainers(numContainers);

Después de definir los requisitos del contenedor, el AM tiene que construir un objeto AllocateRequest que enviará al RM. Este objeto está formado por:

  • Contenedores solicitados: Tanto las especificaciones del contenedor como el número de ellos que el AM va a solicitar al RM.
  • Contenedores liberados: Pueden darse situaciones en las que el ApplicationMaster puede haber pedido más contenedores de los que necesita o, debido a errores, decide utilizar otros contenedores destinados al mismo. En este caso, el AM libera estos contendores y los devuelve al RM para que pueda reasignarlos a otras aplicaciones.
  • ResponseID: El identificador de la respuesta.
  • Información del progreso: El ApplicationMaster puede enviar actualizaciones el progreso de la ejecución (entre 0 y 1 siendo 1 completado) al ResourceManager.
List<ResourceRequest> requestedContainers;
    List<ContainerId> releasedContainers    
    AllocateRequest req = Records.newRecord(AllocateRequest.class);

    // The response id set in the request will be sent back in 
    // the response so that the ApplicationMaster can 
    // match it to its original ask and act appropriately.
    req.setResponseId(rmRequestID);

    // Set ApplicationAttemptId 
    req.setApplicationAttemptId(appAttemptID);

    // Add the list of containers being asked for 
    req.addAllAsks(requestedContainers);

    // If the ApplicationMaster has no need for certain 
    // containers due to over-allocation or for any other
    // reason, it can release them back to the ResourceManager
    req.addAllReleases(releasedContainers);

    // Assuming the ApplicationMaster can track its progress
    req.setProgress(currentProgress);

    AllocateResponse allocateResponse = resourceManager.allocate(req);

El AllocateResponse devuelto al ResourceManager (mediante AMResponse) proporciona la siguiente información:

  • Señal de reinicio: Esto es útil para escenarios en los que el AM puede perder la sincronización con el RM.
  • Contenedores asignados: Los contenedores asignados al AM.
  • Espacio: El espacio en el cluster para los recursos.
  • Contenedores completados: Cuando el AM desencadena la carga del contenedor asignado recibirá una señal o actualización del RM cuando el contenedor se haya completado. Con esto, el AM puede mirar el estado del contenedor y tomar las medidas que considere necesarias en caso de que, por ejemplo, se haya producido un error .

Un detalle a tener en cuenta es que los contenedores no se asignan inmediatamente al AM sino que éste debe continuar solicitando los contenedores que le queden pendientes. Una vez que la petición de asignación ha sido enviada, el AM ubicará los contenedores basandose en la capacidad de cluste las prioridades or las politicas de distribución.

   // Get AMResponse from AllocateResponse 
    AMResponse amResp = allocateResponse.getAMResponse();                       

    // Retrieve list of allocated containers from the response 
    // and on each allocated container, lets assume we are launching 
    // the same job.
    List<Container> allocatedContainers = amResp.getAllocatedContainers();
    for (Container allocatedContainer : allocatedContainers) {
      LOG.info("Launching shell command on a new container."
          + ", containerId=" + allocatedContainer.getId()
          + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
          + ":" + allocatedContainer.getNodeId().getPort()
          + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
          + ", containerState" + allocatedContainer.getState()
          + ", containerResourceMemory"  
          + allocatedContainer.getResource().getMemory());

      // Launch and start the container on a separate thread to keep the main 
      // thread unblocked as all containers may not be allocated at one go.
      LaunchContainerRunnable runnableLaunchContainer = 
          new LaunchContainerRunnable(allocatedContainer);
      Thread launchThread = new Thread(runnableLaunchContainer);        
      launchThreads.add(launchThread);
      launchThread.start();
    }

    // Check what the current available resources in the cluster are
    Resource availableResources = amResp.getAvailableResources();
    // Based on this information, an ApplicationMaster can make appropriate 
    // decisions

    // Check the completed containers
    // Let's assume we are keeping a count of total completed containers, 
    // containers that failed and ones that completed successfully.                     
    List<ContainerStatus> completedContainers = 
        amResp.getCompletedContainersStatuses();
    for (ContainerStatus containerStatus : completedContainers) {                               
      LOG.info("Got container status for containerID= " 
          + containerStatus.getContainerId()
          + ", state=" + containerStatus.getState()     
          + ", exitStatus=" + containerStatus.getExitStatus() 
          + ", diagnostics=" + containerStatus.getDiagnostics());

      int exitStatus = containerStatus.getExitStatus();
      if (0 != exitStatus) {
        // container failed 
        // -100 is a special case where the container 
        // was aborted/pre-empted for some reason 
        if (-100 != exitStatus) {
          // application job on container returned a non-zero exit code
          // counts as completed 
          numCompletedContainers.incrementAndGet();
          numFailedContainers.incrementAndGet();                                                        
        }
        else { 
          // something else bad happened 
          // app job did not complete for some reason 
          // we should re-try as the container was lost for some reason
          // decrementing the requested count so that we ask for an
          // additional one in the next allocate call.          
          numRequestedContainers.decrementAndGet();
          // we do not need to release the container as that has already 
          // been done by the ResourceManager/NodeManager. 
        }
        }
        else { 
          // nothing to do 
          // container completed successfully 
          numCompletedContainers.incrementAndGet();
          numSuccessfulContainers.incrementAndGet();
        }
      }
    }

Una vez que un contenedor ha sido asignado al ApplicationMaster, éste  ha ubicado el contenedor, éste tiene que continuar realizando un proceso similar al del cliente especificando el ContainerLaunchContext para la tarea que va a ser ejecutada. Cuando este ContainerLaunchContext es definido, el AM se comunica con el ContainerManager para iniciar el contenedor asignado.

//Assuming an allocated Container obtained from AMResponse 
    Container container;   
    // Connect to ContainerManager on the allocated container 
    String cmIpPortStr = container.getNodeId().getHost() + ":" 
        + container.getNodeId().getPort();              
    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);               
    ContainerManager cm = 
        (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);     

    // Now we setup a ContainerLaunchContext  
    ContainerLaunchContext ctx = 
        Records.newRecord(ContainerLaunchContext.class);

    ctx.setContainerId(container.getId());
    ctx.setResource(container.getResource());

    try {
      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
    } catch (IOException e) {
      LOG.info(
          "Getting current user failed when trying to launch the container",
          + e.getMessage());
    }

    // Set the environment 
    Map<String, String> unixEnv;
    // Setup the required env. 
    // Please note that the launched container does not inherit 
    // the environment of the ApplicationMaster so all the 
    // necessary environment settings will need to be re-setup 
    // for this allocated container.      
    ctx.setEnvironment(unixEnv);

    // Set the local resources 
    Map<String, LocalResource> localResources = 
        new HashMap<String, LocalResource>();
    // Again, the local resources from the ApplicationMaster is not copied over 
    // by default to the allocated container. Thus, it is the responsibility 
          // of the ApplicationMaster to setup all the necessary local resources 
          // needed by the job that will be executed on the allocated container. 

    // Assume that we are executing a shell script on the allocated container 
    // and the shell script's location in the filesystem is known to us. 
    Path shellScriptPath; 
    LocalResource shellRsrc = Records.newRecord(LocalResource.class);
    shellRsrc.setType(LocalResourceType.FILE);
    shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);          
    shellRsrc.setResource(
        ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
    shellRsrc.setTimestamp(shellScriptPathTimestamp);
    shellRsrc.setSize(shellScriptPathLen);
    localResources.put("MyExecShell.sh", shellRsrc);

    ctx.setLocalResources(localResources);                      

    // Set the necessary command to execute on the allocated container 
    String command = "/bin/sh ./MyExecShell.sh"
        + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
        + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";

    List<String> commands = new ArrayList<String>();
    commands.add(command);
    ctx.setCommands(commands);

    // Send the start request to the ContainerManager
    StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
    startReq.setContainerLaunchContext(ctx);
    cm.startContainer(startReq);

Como ya ha comentado anteriormente, el AM irá recibiendo actualizaciones de los contenedores completados como parte de la respuesta originada por AMRMProtocol#allocate. Igualmente, el AM puede monitorizar los contenedores de forma proactiva preguntando sobre el estado de estos al ContainerManager.

    GetContainerStatusRequest statusReq = 
        Records.newRecord(GetContainerStatusRequest.class);
    statusReq.setContainerId(container.getId());
    GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
    LOG.info("Container Status"
        + ", id=" + container.getId()
        + ", status=" + statusResp.getStatus());

Conclusión

Quizás ahora mismo estén un poco aturdidos por la gran cantidad de información que se ha dado en este tutorial, pero con una segunda y detallada lectura se puede comprender muy bien la finalidad de cada uno de los códigos y su razón de ser. En el siguiente y último de este tutorial vamos a realizar una ejecución de este entorno y se va a ver como distribuir a todos los nodos la aplicación  de ejemplo que fue creada en el tutorial de MapReduce.

Como de constumbre, espero que os haya servido de ayuda y para cualquier duda, mejora o corrección no duden en dejar un comentario.

Happy Minds!!!

No hay artículos relacionados.

Share on FacebookTweet about this on TwitterShare on LinkedInShare on RedditShare on Google+Digg thisShare on TumblrPin on PinterestBuffer this pagePrint this pageEmail this to someone