001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.Vector;
030
031import org.apache.commons.cli.CommandLine;
032import org.apache.commons.cli.GnuParser;
033import org.apache.commons.cli.HelpFormatter;
034import org.apache.commons.cli.Option;
035import org.apache.commons.cli.Options;
036import org.apache.commons.cli.ParseException;
037import org.apache.commons.io.IOUtils;
038import org.apache.commons.lang.StringUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataOutputStream;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.permission.FsPermission;
049import org.apache.hadoop.io.DataOutputBuffer;
050import org.apache.hadoop.security.Credentials;
051import org.apache.hadoop.security.UserGroupInformation;
052import org.apache.hadoop.security.token.Token;
053import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
054import org.apache.hadoop.yarn.api.ApplicationConstants;
055import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
056import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
057import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
058import org.apache.hadoop.yarn.api.records.ApplicationId;
059import org.apache.hadoop.yarn.api.records.ApplicationReport;
060import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
061import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
062import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
063import org.apache.hadoop.yarn.api.records.LocalResource;
064import org.apache.hadoop.yarn.api.records.LocalResourceType;
065import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
066import org.apache.hadoop.yarn.api.records.NodeReport;
067import org.apache.hadoop.yarn.api.records.NodeState;
068import org.apache.hadoop.yarn.api.records.Priority;
069import org.apache.hadoop.yarn.api.records.QueueACL;
070import org.apache.hadoop.yarn.api.records.QueueInfo;
071import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
072import org.apache.hadoop.yarn.api.records.Resource;
073import org.apache.hadoop.yarn.api.records.URL;
074import org.apache.hadoop.yarn.api.records.YarnApplicationState;
075import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
076import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
077import org.apache.hadoop.yarn.client.api.TimelineClient;
078import org.apache.hadoop.yarn.client.api.YarnClient;
079import org.apache.hadoop.yarn.client.api.YarnClientApplication;
080import org.apache.hadoop.yarn.client.util.YarnClientUtils;
081import org.apache.hadoop.yarn.conf.YarnConfiguration;
082import org.apache.hadoop.yarn.exceptions.YarnException;
083import org.apache.hadoop.yarn.util.ConverterUtils;
084import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
085
086/**
087 * Client for Distributed Shell application submission to YARN.
088 * 
089 * <p> The distributed shell client allows an application master to be launched that in turn would run 
090 * the provided shell command on a set of containers. </p>
091 * 
092 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
093 * 
094 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
095 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
096 * provides a way for the client to get access to cluster information and to request for a
097 * new {@link ApplicationId}. <p>
098 * 
099 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
100 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
101 * and application name, the priority assigned to the application and the queue
102 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
103 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
104 * the {@link ApplicationMaster} is launched. </p>
105 * 
106 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
107 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
108 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
109 * {@link ApplicationMaster}. <p>
110 * 
111 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
112 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
113 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
114 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
115 *
116 */
117@InterfaceAudience.Public
118@InterfaceStability.Unstable
119public class Client {
120
121  private static final Log LOG = LogFactory.getLog(Client.class);
122  
123  // Configuration
124  private Configuration conf;
125  private YarnClient yarnClient;
126  // Application master specific info to register a new Application with RM/ASM
127  private String appName = "";
128  // App master priority
129  private int amPriority = 0;
130  // Queue for App master
131  private String amQueue = "";
132  // Amt. of memory resource to request for to run the App Master
133  private long amMemory = 100;
134  // Amt. of virtual core resource to request for to run the App Master
135  private int amVCores = 1;
136
137  // Application master jar file
138  private String appMasterJar = ""; 
139  // Main class to invoke application master
140  private final String appMasterMainClass;
141
142  // Shell command to be executed 
143  private String shellCommand = ""; 
144  // Location of shell script 
145  private String shellScriptPath = ""; 
146  // Args to be passed to the shell command
147  private String[] shellArgs = new String[] {};
148  // Env variables to be setup for the shell command 
149  private Map<String, String> shellEnv = new HashMap<String, String>();
150  // Shell Command Container priority 
151  private int shellCmdPriority = 0;
152
153  // Amt of memory to request for container in which shell script will be executed
154  private int containerMemory = 10; 
155  // Amt. of virtual cores to request for container in which shell script will be executed
156  private int containerVirtualCores = 1;
157  // No. of containers in which the shell script needs to be executed
158  private int numContainers = 1;
159  private String nodeLabelExpression = null;
160
161  // log4j.properties file 
162  // if available, add to local resources and set into classpath 
163  private String log4jPropFile = "";    
164
165  // Start time for client
166  private final long clientStartTime = System.currentTimeMillis();
167  // Timeout threshold for client. Kill app after time interval expires.
168  private long clientTimeout = 600000;
169
170  // flag to indicate whether to keep containers across application attempts.
171  private boolean keepContainers = false;
172
173  private long attemptFailuresValidityInterval = -1;
174
175  private Vector<CharSequence> containerRetryOptions = new Vector<>(5);
176
177  // Debug flag
178  boolean debugFlag = false;
179
180  // Timeline domain ID
181  private String domainId = null;
182
183  // Flag to indicate whether to create the domain of the given ID
184  private boolean toCreateDomain = false;
185
186  // Timeline domain reader access control
187  private String viewACLs = null;
188
189  // Timeline domain writer access control
190  private String modifyACLs = null;
191
192  private String flowName = null;
193  private String flowVersion = null;
194  private long flowRunId = 0L;
195
196  // Command line options
197  private Options opts;
198
199  private static final String shellCommandPath = "shellCommands";
200  private static final String shellArgsPath = "shellArgs";
201  private static final String appMasterJarPath = "AppMaster.jar";
202  // Hardcoded path to custom log_properties
203  private static final String log4jPath = "log4j.properties";
204
205  public static final String SCRIPT_PATH = "ExecScript";
206
207  /**
208   * @param args Command line arguments 
209   */
210  public static void main(String[] args) {
211    boolean result = false;
212    try {
213      Client client = new Client();
214      LOG.info("Initializing Client");
215      try {
216        boolean doRun = client.init(args);
217        if (!doRun) {
218          System.exit(0);
219        }
220      } catch (IllegalArgumentException e) {
221        System.err.println(e.getLocalizedMessage());
222        client.printUsage();
223        System.exit(-1);
224      }
225      result = client.run();
226    } catch (Throwable t) {
227      LOG.fatal("Error running Client", t);
228      System.exit(1);
229    }
230    if (result) {
231      LOG.info("Application completed successfully");
232      System.exit(0);                   
233    } 
234    LOG.error("Application failed to complete successfully");
235    System.exit(2);
236  }
237
238  /**
239   */
240  public Client(Configuration conf) throws Exception  {
241    this(
242      "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
243      conf);
244  }
245
246  Client(String appMasterMainClass, Configuration conf) {
247    this.conf = conf;
248    this.appMasterMainClass = appMasterMainClass;
249    yarnClient = YarnClient.createYarnClient();
250    yarnClient.init(conf);
251    opts = new Options();
252    opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
253    opts.addOption("priority", true, "Application Priority. Default 0");
254    opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
255    opts.addOption("timeout", true, "Application timeout in milliseconds");
256    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
257    opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
258    opts.addOption("jar", true, "Jar file containing the application master");
259    opts.addOption("shell_command", true, "Shell command to be executed by " +
260        "the Application Master. Can only specify either --shell_command " +
261        "or --shell_script");
262    opts.addOption("shell_script", true, "Location of the shell script to be " +
263        "executed. Can only specify either --shell_command or --shell_script");
264    opts.addOption("shell_args", true, "Command line args for the shell script." +
265        "Multiple args can be separated by empty space.");
266    opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
267    opts.addOption("shell_env", true,
268        "Environment for shell script. Specified as env_key=env_val pairs");
269    opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
270    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
271    opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
272    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
273    opts.addOption("log_properties", true, "log4j.properties file");
274    opts.addOption("keep_containers_across_application_attempts", false,
275      "Flag to indicate whether to keep containers across application attempts." +
276      " If the flag is true, running containers will not be killed when" +
277      " application attempt fails and these containers will be retrieved by" +
278      " the new application attempt ");
279    opts.addOption("attempt_failures_validity_interval", true,
280      "when attempt_failures_validity_interval in milliseconds is set to > 0," +
281      "the failure number will not take failures which happen out of " +
282      "the validityInterval into failure count. " +
283      "If failure count reaches to maxAppAttempts, " +
284      "the application will be failed.");
285    opts.addOption("debug", false, "Dump out debug information");
286    opts.addOption("domain", true, "ID of the timeline domain where the "
287        + "timeline entities will be put");
288    opts.addOption("view_acls", true, "Users and groups that allowed to "
289        + "view the timeline entities in the given domain");
290    opts.addOption("modify_acls", true, "Users and groups that allowed to "
291        + "modify the timeline entities in the given domain");
292    opts.addOption("create", false, "Flag to indicate whether to create the "
293        + "domain specified with -domain.");
294    opts.addOption("flow_name", true, "Flow name which the distributed shell "
295        + "app belongs to");
296    opts.addOption("flow_version", true, "Flow version which the distributed "
297        + "shell app belongs to");
298    opts.addOption("flow_run_id", true, "Flow run ID which the distributed "
299        + "shell app belongs to");
300    opts.addOption("help", false, "Print usage");
301    opts.addOption("node_label_expression", true,
302        "Node label expression to determine the nodes"
303            + " where all the containers of this application"
304            + " will be allocated, \"\" means containers"
305            + " can be allocated anywhere, if you don't specify the option,"
306            + " default node_label_expression of queue will be used.");
307    opts.addOption("container_retry_policy", true,
308        "Retry policy when container fails to run, "
309            + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
310            + "2: RETRY_ON_SPECIFIC_ERROR_CODES");
311    opts.addOption("container_retry_error_codes", true,
312        "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
313            + "codes is specified with this option, "
314            + "e.g. --container_retry_error_codes 1,2,3");
315    opts.addOption("container_max_retries", true,
316        "If container could retry, it specifies max retires");
317    opts.addOption("container_retry_interval", true,
318        "Interval between each retry, unit is milliseconds");
319  }
320
321  /**
322   */
323  public Client() throws Exception  {
324    this(new YarnConfiguration());
325  }
326
327  /**
328   * Helper function to print out usage
329   */
330  private void printUsage() {
331    new HelpFormatter().printHelp("Client", opts);
332  }
333
334  /**
335   * Parse command line options
336   * @param args Parsed command line options 
337   * @return Whether the init was successful to run the client
338   * @throws ParseException
339   */
340  public boolean init(String[] args) throws ParseException {
341
342    CommandLine cliParser = new GnuParser().parse(opts, args);
343
344    if (args.length == 0) {
345      throw new IllegalArgumentException("No args specified for client to initialize");
346    }
347
348    if (cliParser.hasOption("log_properties")) {
349      String log4jPath = cliParser.getOptionValue("log_properties");
350      try {
351        Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
352      } catch (Exception e) {
353        LOG.warn("Can not set up custom log4j properties. " + e);
354      }
355    }
356
357    if (cliParser.hasOption("help")) {
358      printUsage();
359      return false;
360    }
361
362    if (cliParser.hasOption("debug")) {
363      debugFlag = true;
364
365    }
366
367    if (cliParser.hasOption("keep_containers_across_application_attempts")) {
368      LOG.info("keep_containers_across_application_attempts");
369      keepContainers = true;
370    }
371
372    appName = cliParser.getOptionValue("appname", "DistributedShell");
373    amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
374    amQueue = cliParser.getOptionValue("queue", "default");
375    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100"));
376    amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
377
378    if (amMemory < 0) {
379      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
380          + " Specified memory=" + amMemory);
381    }
382    if (amVCores < 0) {
383      throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
384          + " Specified virtual cores=" + amVCores);
385    }
386
387    if (!cliParser.hasOption("jar")) {
388      throw new IllegalArgumentException("No jar file specified for application master");
389    }           
390
391    appMasterJar = cliParser.getOptionValue("jar");
392
393    if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
394      throw new IllegalArgumentException(
395          "No shell command or shell script specified to be executed by application master");
396    } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
397      throw new IllegalArgumentException("Can not specify shell_command option " +
398          "and shell_script option at the same time");
399    } else if (cliParser.hasOption("shell_command")) {
400      shellCommand = cliParser.getOptionValue("shell_command");
401    } else {
402      shellScriptPath = cliParser.getOptionValue("shell_script");
403    }
404    if (cliParser.hasOption("shell_args")) {
405      shellArgs = cliParser.getOptionValues("shell_args");
406    }
407    if (cliParser.hasOption("shell_env")) { 
408      String envs[] = cliParser.getOptionValues("shell_env");
409      for (String env : envs) {
410        env = env.trim();
411        int index = env.indexOf('=');
412        if (index == -1) {
413          shellEnv.put(env, "");
414          continue;
415        }
416        String key = env.substring(0, index);
417        String val = "";
418        if (index < (env.length()-1)) {
419          val = env.substring(index+1);
420        }
421        shellEnv.put(key, val);
422      }
423    }
424    shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
425
426    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
427    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
428    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
429    
430
431    if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
432      throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
433          + " exiting."
434          + " Specified containerMemory=" + containerMemory
435          + ", containerVirtualCores=" + containerVirtualCores
436          + ", numContainer=" + numContainers);
437    }
438    
439    nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
440
441    clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
442
443    attemptFailuresValidityInterval =
444        Long.parseLong(cliParser.getOptionValue(
445          "attempt_failures_validity_interval", "-1"));
446
447    log4jPropFile = cliParser.getOptionValue("log_properties", "");
448
449    // Get timeline domain options
450    if (cliParser.hasOption("domain")) {
451      domainId = cliParser.getOptionValue("domain");
452      toCreateDomain = cliParser.hasOption("create");
453      if (cliParser.hasOption("view_acls")) {
454        viewACLs = cliParser.getOptionValue("view_acls");
455      }
456      if (cliParser.hasOption("modify_acls")) {
457        modifyACLs = cliParser.getOptionValue("modify_acls");
458      }
459    }
460
461    // Get container retry options
462    if (cliParser.hasOption("container_retry_policy")) {
463      containerRetryOptions.add("--container_retry_policy "
464          + cliParser.getOptionValue("container_retry_policy"));
465    }
466    if (cliParser.hasOption("container_retry_error_codes")) {
467      containerRetryOptions.add("--container_retry_error_codes "
468          + cliParser.getOptionValue("container_retry_error_codes"));
469    }
470    if (cliParser.hasOption("container_max_retries")) {
471      containerRetryOptions.add("--container_max_retries "
472          + cliParser.getOptionValue("container_max_retries"));
473    }
474    if (cliParser.hasOption("container_retry_interval")) {
475      containerRetryOptions.add("--container_retry_interval "
476          + cliParser.getOptionValue("container_retry_interval"));
477    }
478
479    if (cliParser.hasOption("flow_name")) {
480      flowName = cliParser.getOptionValue("flow_name");
481    }
482    if (cliParser.hasOption("flow_version")) {
483      flowVersion = cliParser.getOptionValue("flow_version");
484    }
485    if (cliParser.hasOption("flow_run_id")) {
486      try {
487        flowRunId = Long.parseLong(cliParser.getOptionValue("flow_run_id"));
488      } catch (NumberFormatException e) {
489        throw new IllegalArgumentException(
490            "Flow run is not a valid long value", e);
491      }
492    }
493    return true;
494  }
495
496  /**
497   * Main run function for the client
498   * @return true if application completed successfully
499   * @throws IOException
500   * @throws YarnException
501   */
502  public boolean run() throws IOException, YarnException {
503
504    LOG.info("Running Client");
505    yarnClient.start();
506
507    YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
508    LOG.info("Got Cluster metric info from ASM" 
509        + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
510
511    List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
512        NodeState.RUNNING);
513    LOG.info("Got Cluster node info from ASM");
514    for (NodeReport node : clusterNodeReports) {
515      LOG.info("Got node report from ASM for"
516          + ", nodeId=" + node.getNodeId() 
517          + ", nodeAddress=" + node.getHttpAddress()
518          + ", nodeRackName=" + node.getRackName()
519          + ", nodeNumContainers=" + node.getNumContainers());
520    }
521
522    QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
523    LOG.info("Queue info"
524        + ", queueName=" + queueInfo.getQueueName()
525        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
526        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
527        + ", queueApplicationCount=" + queueInfo.getApplications().size()
528        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());               
529
530    List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
531    for (QueueUserACLInfo aclInfo : listAclInfo) {
532      for (QueueACL userAcl : aclInfo.getUserAcls()) {
533        LOG.info("User ACL Info for Queue"
534            + ", queueName=" + aclInfo.getQueueName()                   
535            + ", userAcl=" + userAcl.name());
536      }
537    }           
538
539    if (domainId != null && domainId.length() > 0 && toCreateDomain) {
540      prepareTimelineDomain();
541    }
542
543    // Get a new application id
544    YarnClientApplication app = yarnClient.createApplication();
545    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
546    // TODO get min/max resource capabilities from RM and change memory ask if needed
547    // If we do not have min/max, we may not be able to correctly request 
548    // the required resources from the RM for the app master
549    // Memory ask has to be a multiple of min and less than max. 
550    // Dump out information about cluster capability as seen by the resource manager
551    long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
552    LOG.info("Max mem capability of resources in this cluster " + maxMem);
553
554    // A resource ask cannot exceed the max. 
555    if (amMemory > maxMem) {
556      LOG.info("AM memory specified above max threshold of cluster. Using max value."
557          + ", specified=" + amMemory
558          + ", max=" + maxMem);
559      amMemory = maxMem;
560    }                           
561
562    int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
563    LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores);
564    
565    if (amVCores > maxVCores) {
566      LOG.info("AM virtual cores specified above max threshold of cluster. " 
567          + "Using max value." + ", specified=" + amVCores 
568          + ", max=" + maxVCores);
569      amVCores = maxVCores;
570    }
571    
572    // set the application name
573    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
574    ApplicationId appId = appContext.getApplicationId();
575
576    appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
577    appContext.setApplicationName(appName);
578
579    if (attemptFailuresValidityInterval >= 0) {
580      appContext
581        .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
582    }
583
584    Set<String> tags = new HashSet<String>();
585    if (flowName != null) {
586      tags.add(TimelineUtils.generateFlowNameTag(flowName));
587    }
588    if (flowVersion != null) {
589      tags.add(TimelineUtils.generateFlowVersionTag(flowVersion));
590    }
591    if (flowRunId != 0) {
592      tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
593    }
594    appContext.setApplicationTags(tags);
595
596    // set local resources for the application master
597    // local files or archives as needed
598    // In this scenario, the jar file for the application master is part of the local resources                 
599    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
600
601    LOG.info("Copy App Master jar from local filesystem and add to local environment");
602    // Copy the application master jar to the filesystem 
603    // Create a local resource to point to the destination jar path 
604    FileSystem fs = FileSystem.get(conf);
605    addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
606        localResources, null);
607
608    // Set the log4j properties if needed 
609    if (!log4jPropFile.isEmpty()) {
610      addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
611          localResources, null);
612    }                   
613
614    // The shell script has to be made available on the final container(s)
615    // where it will be executed. 
616    // To do this, we need to first copy into the filesystem that is visible 
617    // to the yarn framework. 
618    // We do not need to set this as a local resource for the application 
619    // master as the application master does not need it.               
620    String hdfsShellScriptLocation = ""; 
621    long hdfsShellScriptLen = 0;
622    long hdfsShellScriptTimestamp = 0;
623    if (!shellScriptPath.isEmpty()) {
624      Path shellSrc = new Path(shellScriptPath);
625      String shellPathSuffix =
626          appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
627      Path shellDst =
628          new Path(fs.getHomeDirectory(), shellPathSuffix);
629      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
630      hdfsShellScriptLocation = shellDst.toUri().toString(); 
631      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
632      hdfsShellScriptLen = shellFileStatus.getLen();
633      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
634    }
635
636    if (!shellCommand.isEmpty()) {
637      addToLocalResources(fs, null, shellCommandPath, appId.toString(),
638          localResources, shellCommand);
639    }
640
641    if (shellArgs.length > 0) {
642      addToLocalResources(fs, null, shellArgsPath, appId.toString(),
643          localResources, StringUtils.join(shellArgs, " "));
644    }
645
646    // Set the necessary security tokens as needed
647    //amContainer.setContainerTokens(containerToken);
648
649    // Set the env variables to be setup in the env where the application master will be run
650    LOG.info("Set the environment for the application master");
651    Map<String, String> env = new HashMap<String, String>();
652
653    // put location of shell script into env
654    // using the env info, the application master will create the correct local resource for the 
655    // eventual containers that will be launched to execute the shell scripts
656    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
657    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
658    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
659    if (domainId != null && domainId.length() > 0) {
660      env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
661    }
662
663    // Add AppMaster.jar location to classpath          
664    // At some point we should not be required to add 
665    // the hadoop specific classpaths to the env. 
666    // It should be provided out of the box. 
667    // For now setting all required classpaths including
668    // the classpath to "." for the application jar
669    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
670      .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
671    for (String c : conf.getStrings(
672        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
673        YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
674      classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
675      classPathEnv.append(c.trim());
676    }
677    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
678      "./log4j.properties");
679
680    // add the runtime classpath needed for tests to work
681    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
682      classPathEnv.append(':');
683      classPathEnv.append(System.getProperty("java.class.path"));
684    }
685
686    env.put("CLASSPATH", classPathEnv.toString());
687
688    // Set the necessary command to execute the application master 
689    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
690
691    // Set java executable command 
692    LOG.info("Setting up app master command");
693    vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
694    // Set Xmx based on am memory size
695    vargs.add("-Xmx" + amMemory + "m");
696    // Set class name 
697    vargs.add(appMasterMainClass);
698    // Set params for Application Master
699    vargs.add("--container_memory " + String.valueOf(containerMemory));
700    vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
701    vargs.add("--num_containers " + String.valueOf(numContainers));
702    if (null != nodeLabelExpression) {
703      appContext.setNodeLabelExpression(nodeLabelExpression);
704    }
705    vargs.add("--priority " + String.valueOf(shellCmdPriority));
706
707    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
708      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
709    }
710    if (debugFlag) {
711      vargs.add("--debug");
712    }
713
714    vargs.addAll(containerRetryOptions);
715
716    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
717    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
718
719    // Get final commmand
720    StringBuilder command = new StringBuilder();
721    for (CharSequence str : vargs) {
722      command.append(str).append(" ");
723    }
724
725    LOG.info("Completed setting up app master command " + command.toString());
726    List<String> commands = new ArrayList<String>();
727    commands.add(command.toString());           
728
729    // Set up the container launch context for the application master
730    ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
731      localResources, env, commands, null, null, null);
732
733    // Set up resource type requirements
734    // For now, both memory and vcores are supported, so we set memory and 
735    // vcores requirements
736    Resource capability = Resource.newInstance(amMemory, amVCores);
737    appContext.setResource(capability);
738
739    // Service data is a binary blob that can be passed to the application
740    // Not needed in this scenario
741    // amContainer.setServiceData(serviceData);
742
743    // Setup security tokens
744    if (UserGroupInformation.isSecurityEnabled()) {
745      // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
746      Credentials credentials = new Credentials();
747      String tokenRenewer = YarnClientUtils.getRmPrincipal(conf);
748      if (tokenRenewer == null || tokenRenewer.length() == 0) {
749        throw new IOException(
750          "Can't get Master Kerberos principal for the RM to use as renewer");
751      }
752
753      // For now, only getting tokens for the default file-system.
754      final Token<?> tokens[] =
755          fs.addDelegationTokens(tokenRenewer, credentials);
756      if (tokens != null) {
757        for (Token<?> token : tokens) {
758          LOG.info("Got dt for " + fs.getUri() + "; " + token);
759        }
760      }
761      DataOutputBuffer dob = new DataOutputBuffer();
762      credentials.writeTokenStorageToStream(dob);
763      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
764      amContainer.setTokens(fsTokens);
765    }
766
767    appContext.setAMContainerSpec(amContainer);
768
769    // Set the priority for the application master
770    // TODO - what is the range for priority? how to decide? 
771    Priority pri = Priority.newInstance(amPriority);
772    appContext.setPriority(pri);
773
774    // Set the queue to which this application is to be submitted in the RM
775    appContext.setQueue(amQueue);
776
777    // Submit the application to the applications manager
778    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
779    // Ignore the response as either a valid response object is returned on success 
780    // or an exception thrown to denote some form of a failure
781    LOG.info("Submitting application to ASM");
782
783    yarnClient.submitApplication(appContext);
784
785    // TODO
786    // Try submitting the same request again
787    // app submission failure?
788
789    // Monitor the application
790    return monitorApplication(appId);
791
792  }
793
794  /**
795   * Monitor the submitted application for completion. 
796   * Kill application if time expires. 
797   * @param appId Application Id of application to be monitored
798   * @return true if application completed successfully
799   * @throws YarnException
800   * @throws IOException
801   */
802  private boolean monitorApplication(ApplicationId appId)
803      throws YarnException, IOException {
804
805    while (true) {
806
807      // Check app status every 1 second.
808      try {
809        Thread.sleep(1000);
810      } catch (InterruptedException e) {
811        LOG.debug("Thread sleep in monitoring loop interrupted");
812      }
813
814      // Get application report for the appId we are interested in 
815      ApplicationReport report = yarnClient.getApplicationReport(appId);
816
817      LOG.info("Got application report from ASM for"
818          + ", appId=" + appId.getId()
819          + ", clientToAMToken=" + report.getClientToAMToken()
820          + ", appDiagnostics=" + report.getDiagnostics()
821          + ", appMasterHost=" + report.getHost()
822          + ", appQueue=" + report.getQueue()
823          + ", appMasterRpcPort=" + report.getRpcPort()
824          + ", appStartTime=" + report.getStartTime()
825          + ", yarnAppState=" + report.getYarnApplicationState().toString()
826          + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
827          + ", appTrackingUrl=" + report.getTrackingUrl()
828          + ", appUser=" + report.getUser());
829
830      YarnApplicationState state = report.getYarnApplicationState();
831      FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
832      if (YarnApplicationState.FINISHED == state) {
833        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
834          LOG.info("Application has completed successfully. Breaking monitoring loop");
835          return true;        
836        }
837        else {
838          LOG.info("Application did finished unsuccessfully."
839              + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
840              + ". Breaking monitoring loop");
841          return false;
842        }                         
843      }
844      else if (YarnApplicationState.KILLED == state     
845          || YarnApplicationState.FAILED == state) {
846        LOG.info("Application did not finish."
847            + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
848            + ". Breaking monitoring loop");
849        return false;
850      }                 
851
852      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
853        LOG.info("Reached client specified timeout for application. Killing application");
854        forceKillApplication(appId);
855        return false;                           
856      }
857    }                   
858
859  }
860
861  /**
862   * Kill a submitted application by sending a call to the ASM
863   * @param appId Application Id to be killed. 
864   * @throws YarnException
865   * @throws IOException
866   */
867  private void forceKillApplication(ApplicationId appId)
868      throws YarnException, IOException {
869    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
870    // the same time. 
871    // If yes, can we kill a particular attempt only?
872
873    // Response can be ignored as it is non-null on success or 
874    // throws an exception in case of failures
875    yarnClient.killApplication(appId);  
876  }
877
878  private void addToLocalResources(FileSystem fs, String fileSrcPath,
879      String fileDstPath, String appId, Map<String, LocalResource> localResources,
880      String resources) throws IOException {
881    String suffix =
882        appName + "/" + appId + "/" + fileDstPath;
883    Path dst =
884        new Path(fs.getHomeDirectory(), suffix);
885    if (fileSrcPath == null) {
886      FSDataOutputStream ostream = null;
887      try {
888        ostream = FileSystem
889            .create(fs, dst, new FsPermission((short) 0710));
890        ostream.writeUTF(resources);
891      } finally {
892        IOUtils.closeQuietly(ostream);
893      }
894    } else {
895      fs.copyFromLocalFile(new Path(fileSrcPath), dst);
896    }
897    FileStatus scFileStatus = fs.getFileStatus(dst);
898    LocalResource scRsrc =
899        LocalResource.newInstance(
900            URL.fromURI(dst.toUri()),
901            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
902            scFileStatus.getLen(), scFileStatus.getModificationTime());
903    localResources.put(fileDstPath, scRsrc);
904  }
905
906  private void prepareTimelineDomain() {
907    TimelineClient timelineClient = null;
908    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
909        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
910      timelineClient = TimelineClient.createTimelineClient();
911      timelineClient.init(conf);
912      timelineClient.start();
913    } else {
914      LOG.warn("Cannot put the domain " + domainId +
915          " because the timeline service is not enabled");
916      return;
917    }
918    try {
919      //TODO: we need to check and combine the existing timeline domain ACLs,
920      //but let's do it once we have client java library to query domains.
921      TimelineDomain domain = new TimelineDomain();
922      domain.setId(domainId);
923      domain.setReaders(
924          viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
925      domain.setWriters(
926          modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
927      timelineClient.putDomain(domain);
928      LOG.info("Put the timeline domain: " +
929          TimelineUtils.dumpTimelineRecordtoJSON(domain));
930    } catch (Exception e) {
931      LOG.error("Error when putting the timeline domain", e);
932    } finally {
933      timelineClient.stop();
934    }
935  }
936}