001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.DataInputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.StringReader;
027import java.lang.reflect.UndeclaredThrowableException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.ByteBuffer;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.Vector;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.atomic.AtomicInteger;
044
045import org.apache.commons.cli.CommandLine;
046import org.apache.commons.cli.GnuParser;
047import org.apache.commons.cli.HelpFormatter;
048import org.apache.commons.cli.Options;
049import org.apache.commons.cli.ParseException;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.apache.hadoop.classification.InterfaceAudience;
053import org.apache.hadoop.classification.InterfaceAudience.Private;
054import org.apache.hadoop.classification.InterfaceStability;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.io.DataOutputBuffer;
059import org.apache.hadoop.io.IOUtils;
060import org.apache.hadoop.net.NetUtils;
061import org.apache.hadoop.security.Credentials;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.token.Token;
064import org.apache.hadoop.util.ExitUtil;
065import org.apache.hadoop.util.Shell;
066import org.apache.hadoop.yarn.api.ApplicationConstants;
067import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
068import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
069import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
070import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
071import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
072import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
073import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
074import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
075import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
076import org.apache.hadoop.yarn.api.records.Container;
077import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
078import org.apache.hadoop.yarn.api.records.ContainerId;
079import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
080import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
081import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
082import org.apache.hadoop.yarn.api.records.ContainerState;
083import org.apache.hadoop.yarn.api.records.ContainerStatus;
084import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
085import org.apache.hadoop.yarn.api.records.LocalResource;
086import org.apache.hadoop.yarn.api.records.LocalResourceType;
087import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
088import org.apache.hadoop.yarn.api.records.NodeReport;
089import org.apache.hadoop.yarn.api.records.Priority;
090import org.apache.hadoop.yarn.api.records.Resource;
091import org.apache.hadoop.yarn.api.records.ResourceRequest;
092import org.apache.hadoop.yarn.api.records.URL;
093import org.apache.hadoop.yarn.api.records.UpdatedContainer;
094import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
095import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
096import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
097import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
098import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
099import org.apache.hadoop.yarn.client.api.TimelineClient;
100import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
101import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
102import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
103import org.apache.hadoop.yarn.conf.YarnConfiguration;
104import org.apache.hadoop.yarn.exceptions.YarnException;
105import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
106import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
107import org.apache.log4j.LogManager;
108
109import com.google.common.annotations.VisibleForTesting;
110import com.sun.jersey.api.client.ClientHandlerException;
111
112/**
113 * An ApplicationMaster for executing shell commands on a set of launched
114 * containers using the YARN framework.
115 * 
116 * <p>
117 * This class is meant to act as an example on how to write yarn-based
118 * application masters.
119 * </p>
120 * 
121 * <p>
122 * The ApplicationMaster is started on a container by the
123 * <code>ResourceManager</code>'s launcher. The first thing that the
124 * <code>ApplicationMaster</code> needs to do is to connect and register itself
125 * with the <code>ResourceManager</code>. The registration sets up information
126 * within the <code>ResourceManager</code> regarding what host:port the
127 * ApplicationMaster is listening on to provide any form of functionality to a
128 * client as well as a tracking url that a client can use to keep track of
129 * status/job history if needed. However, in the distributedshell, trackingurl
130 * and appMasterHost:appMasterRpcPort are not supported.
131 * </p>
132 * 
133 * <p>
134 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
135 * <code>ResourceManager</code> at regular intervals to inform the
136 * <code>ResourceManager</code> that it is up and alive. The
137 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
138 * <code>ApplicationMaster</code> acts as a heartbeat.
139 * 
140 * <p>
141 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
142 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
143 * required no. of containers using {@link ResourceRequest} with the necessary
144 * resource specifications such as node location, computational
145 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
146 * responds with an {@link AllocateResponse} that informs the
147 * <code>ApplicationMaster</code> of the set of newly allocated containers,
148 * completed containers as well as current state of available resources.
149 * </p>
150 * 
151 * <p>
152 * For each allocated container, the <code>ApplicationMaster</code> can then set
153 * up the necessary launch context via {@link ContainerLaunchContext} to specify
154 * the allocated container id, local resources required by the executable, the
155 * environment to be setup for the executable, commands to execute, etc. and
156 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
157 * launch and execute the defined commands on the given allocated container.
158 * </p>
159 * 
160 * <p>
161 * The <code>ApplicationMaster</code> can monitor the launched container by
162 * either querying the <code>ResourceManager</code> using
163 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
164 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
165 * container's {@link ContainerId}.
166 *
167 * <p>
168 * After the job has been completed, the <code>ApplicationMaster</code> has to
169 * send a {@link FinishApplicationMasterRequest} to the
170 * <code>ResourceManager</code> to inform it that the
171 * <code>ApplicationMaster</code> has been completed.
172 */
173@InterfaceAudience.Public
174@InterfaceStability.Unstable
175public class ApplicationMaster {
176
177  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
178
179  @VisibleForTesting
180  @Private
181  public static enum DSEvent {
182    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
183  }
184  
185  @VisibleForTesting
186  @Private
187  public static enum DSEntity {
188    DS_APP_ATTEMPT, DS_CONTAINER
189  }
190
191  private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
192
193  // Configuration
194  private Configuration conf;
195
196  // Handle to communicate with the Resource Manager
197  @SuppressWarnings("rawtypes")
198  private AMRMClientAsync amRMClient;
199
200  // In both secure and non-secure modes, this points to the job-submitter.
201  @VisibleForTesting
202  UserGroupInformation appSubmitterUgi;
203
204  // Handle to communicate with the Node Manager
205  private NMClientAsync nmClientAsync;
206  // Listen to process the response from the Node Manager
207  private NMCallbackHandler containerListener;
208
209  // Application Attempt Id ( combination of attemptId and fail count )
210  @VisibleForTesting
211  protected ApplicationAttemptId appAttemptID;
212
213  // TODO
214  // For status update for clients - yet to be implemented
215  // Hostname of the container
216  private String appMasterHostname = "";
217  // Port on which the app master listens for status updates from clients
218  private int appMasterRpcPort = -1;
219  // Tracking url to which app master publishes info for clients to monitor
220  private String appMasterTrackingUrl = "";
221
222  private boolean timelineServiceV2 = false;
223
224  // App Master configuration
225  // No. of containers to run shell command on
226  @VisibleForTesting
227  protected int numTotalContainers = 1;
228  // Memory to request for the container on which the shell command will run
229  private long containerMemory = 10;
230  // VirtualCores to request for the container on which the shell command will run
231  private int containerVirtualCores = 1;
232  // Priority of the request
233  private int requestPriority;
234
235  // Counter for completed containers ( complete denotes successful or failed )
236  private AtomicInteger numCompletedContainers = new AtomicInteger();
237  // Allocated container count so that we know how many containers has the RM
238  // allocated to us
239  @VisibleForTesting
240  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
241  // Count of failed containers
242  private AtomicInteger numFailedContainers = new AtomicInteger();
243  // Count of containers already requested from the RM
244  // Needed as once requested, we should not request for containers again.
245  // Only request for more if the original requirement changes.
246  @VisibleForTesting
247  protected AtomicInteger numRequestedContainers = new AtomicInteger();
248
249  // Shell command to be executed
250  private String shellCommand = "";
251  // Args to be passed to the shell command
252  private String shellArgs = "";
253  // Env variables to be setup for the shell command
254  private Map<String, String> shellEnv = new HashMap<String, String>();
255
256  // Location of shell script ( obtained from info set in env )
257  // Shell script path in fs
258  private String scriptPath = "";
259  // Timestamp needed for creating a local resource
260  private long shellScriptPathTimestamp = 0;
261  // File length needed for local resource
262  private long shellScriptPathLen = 0;
263
264  // Container retry options
265  private ContainerRetryPolicy containerRetryPolicy =
266      ContainerRetryPolicy.NEVER_RETRY;
267  private Set<Integer> containerRetryErrorCodes = null;
268  private int containerMaxRetries = 0;
269  private int containrRetryInterval = 0;
270
271  // Timeline domain ID
272  private String domainId = null;
273
274  // Hardcoded path to shell script in launch container's local env
275  private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH
276      + ".sh";
277  private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH
278      + ".bat";
279
280  // Hardcoded path to custom log_properties
281  private static final String log4jPath = "log4j.properties";
282
283  private static final String shellCommandPath = "shellCommands";
284  private static final String shellArgsPath = "shellArgs";
285
286  private volatile boolean done;
287
288  private ByteBuffer allTokens;
289
290  // Launch threads
291  private List<Thread> launchThreads = new ArrayList<Thread>();
292
293  // Timeline Client
294  @VisibleForTesting
295  TimelineClient timelineClient;
296  static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
297  static final String APPID_TIMELINE_FILTER_NAME = "appId";
298  static final String USER_TIMELINE_FILTER_NAME = "user";
299
300  private final String linux_bash_command = "bash";
301  private final String windows_command = "cmd /c";
302
303  private int yarnShellIdCounter = 1;
304
305  @VisibleForTesting
306  protected final Set<ContainerId> launchedContainers =
307      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
308
309  /**
310   * @param args Command line args
311   */
312  public static void main(String[] args) {
313    boolean result = false;
314    try {
315      ApplicationMaster appMaster = new ApplicationMaster();
316      LOG.info("Initializing ApplicationMaster");
317      boolean doRun = appMaster.init(args);
318      if (!doRun) {
319        System.exit(0);
320      }
321      appMaster.run();
322      result = appMaster.finish();
323    } catch (Throwable t) {
324      LOG.fatal("Error running ApplicationMaster", t);
325      LogManager.shutdown();
326      ExitUtil.terminate(1, t);
327    }
328    if (result) {
329      LOG.info("Application Master completed successfully. exiting");
330      System.exit(0);
331    } else {
332      LOG.info("Application Master failed. exiting");
333      System.exit(2);
334    }
335  }
336
337  /**
338   * Dump out contents of $CWD and the environment to stdout for debugging
339   */
340  private void dumpOutDebugInfo() {
341
342    LOG.info("Dump debug output");
343    Map<String, String> envs = System.getenv();
344    for (Map.Entry<String, String> env : envs.entrySet()) {
345      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
346      System.out.println("System env: key=" + env.getKey() + ", val="
347          + env.getValue());
348    }
349
350    BufferedReader buf = null;
351    try {
352      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
353        Shell.execCommand("ls", "-al");
354      buf = new BufferedReader(new StringReader(lines));
355      String line = "";
356      while ((line = buf.readLine()) != null) {
357        LOG.info("System CWD content: " + line);
358        System.out.println("System CWD content: " + line);
359      }
360    } catch (IOException e) {
361      e.printStackTrace();
362    } finally {
363      IOUtils.cleanup(LOG, buf);
364    }
365  }
366
367  public ApplicationMaster() {
368    // Set up the configuration
369    conf = new YarnConfiguration();
370  }
371
372  /**
373   * Parse command line options
374   *
375   * @param args Command line args
376   * @return Whether init successful and run should be invoked
377   * @throws ParseException
378   * @throws IOException
379   */
380  public boolean init(String[] args) throws ParseException, IOException {
381    Options opts = new Options();
382    opts.addOption("app_attempt_id", true,
383        "App Attempt ID. Not to be used unless for testing purposes");
384    opts.addOption("shell_env", true,
385        "Environment for shell script. Specified as env_key=env_val pairs");
386    opts.addOption("container_memory", true,
387        "Amount of memory in MB to be requested to run the shell command");
388    opts.addOption("container_vcores", true,
389        "Amount of virtual cores to be requested to run the shell command");
390    opts.addOption("num_containers", true,
391        "No. of containers on which the shell command needs to be executed");
392    opts.addOption("priority", true, "Application Priority. Default 0");
393    opts.addOption("container_retry_policy", true,
394        "Retry policy when container fails to run, "
395            + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
396            + "2: RETRY_ON_SPECIFIC_ERROR_CODES");
397    opts.addOption("container_retry_error_codes", true,
398        "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
399            + "codes is specified with this option, "
400            + "e.g. --container_retry_error_codes 1,2,3");
401    opts.addOption("container_max_retries", true,
402        "If container could retry, it specifies max retires");
403    opts.addOption("container_retry_interval", true,
404        "Interval between each retry, unit is milliseconds");
405    opts.addOption("debug", false, "Dump out debug information");
406
407    opts.addOption("help", false, "Print usage");
408    CommandLine cliParser = new GnuParser().parse(opts, args);
409
410    if (args.length == 0) {
411      printUsage(opts);
412      throw new IllegalArgumentException(
413          "No args specified for application master to initialize");
414    }
415
416    //Check whether customer log4j.properties file exists
417    if (fileExist(log4jPath)) {
418      try {
419        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
420            log4jPath);
421      } catch (Exception e) {
422        LOG.warn("Can not set up custom log4j properties. " + e);
423      }
424    }
425
426    if (cliParser.hasOption("help")) {
427      printUsage(opts);
428      return false;
429    }
430
431    if (cliParser.hasOption("debug")) {
432      dumpOutDebugInfo();
433    }
434
435    Map<String, String> envs = System.getenv();
436
437    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
438      if (cliParser.hasOption("app_attempt_id")) {
439        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
440        appAttemptID = ApplicationAttemptId.fromString(appIdStr);
441      } else {
442        throw new IllegalArgumentException(
443            "Application Attempt Id not set in the environment");
444      }
445    } else {
446      ContainerId containerId = ContainerId.fromString(envs
447          .get(Environment.CONTAINER_ID.name()));
448      appAttemptID = containerId.getApplicationAttemptId();
449    }
450
451    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
452      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
453          + " not set in the environment");
454    }
455    if (!envs.containsKey(Environment.NM_HOST.name())) {
456      throw new RuntimeException(Environment.NM_HOST.name()
457          + " not set in the environment");
458    }
459    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
460      throw new RuntimeException(Environment.NM_HTTP_PORT
461          + " not set in the environment");
462    }
463    if (!envs.containsKey(Environment.NM_PORT.name())) {
464      throw new RuntimeException(Environment.NM_PORT.name()
465          + " not set in the environment");
466    }
467
468    LOG.info("Application master for app" + ", appId="
469        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
470        + appAttemptID.getApplicationId().getClusterTimestamp()
471        + ", attemptId=" + appAttemptID.getAttemptId());
472
473    if (!fileExist(shellCommandPath)
474        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
475      throw new IllegalArgumentException(
476          "No shell command or shell script specified to be executed by application master");
477    }
478
479    if (fileExist(shellCommandPath)) {
480      shellCommand = readContent(shellCommandPath);
481    }
482
483    if (fileExist(shellArgsPath)) {
484      shellArgs = readContent(shellArgsPath);
485    }
486
487    if (cliParser.hasOption("shell_env")) {
488      String shellEnvs[] = cliParser.getOptionValues("shell_env");
489      for (String env : shellEnvs) {
490        env = env.trim();
491        int index = env.indexOf('=');
492        if (index == -1) {
493          shellEnv.put(env, "");
494          continue;
495        }
496        String key = env.substring(0, index);
497        String val = "";
498        if (index < (env.length() - 1)) {
499          val = env.substring(index + 1);
500        }
501        shellEnv.put(key, val);
502      }
503    }
504
505    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
506      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
507
508      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
509        shellScriptPathTimestamp = Long.parseLong(envs
510            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
511      }
512      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
513        shellScriptPathLen = Long.parseLong(envs
514            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
515      }
516      if (!scriptPath.isEmpty()
517          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
518        LOG.error("Illegal values in env for shell script path" + ", path="
519            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
520            + shellScriptPathTimestamp);
521        throw new IllegalArgumentException(
522            "Illegal values in env for shell script path");
523      }
524    }
525
526    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
527      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
528    }
529
530    containerMemory = Integer.parseInt(cliParser.getOptionValue(
531        "container_memory", "10"));
532    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
533        "container_vcores", "1"));
534    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
535        "num_containers", "1"));
536    if (numTotalContainers == 0) {
537      throw new IllegalArgumentException(
538          "Cannot run distributed shell with no containers");
539    }
540    requestPriority = Integer.parseInt(cliParser
541        .getOptionValue("priority", "0"));
542
543    containerRetryPolicy = ContainerRetryPolicy.values()[
544        Integer.parseInt(cliParser.getOptionValue(
545            "container_retry_policy", "0"))];
546    if (cliParser.hasOption("container_retry_error_codes")) {
547      containerRetryErrorCodes = new HashSet<>();
548      for (String errorCode :
549          cliParser.getOptionValue("container_retry_error_codes").split(",")) {
550        containerRetryErrorCodes.add(Integer.parseInt(errorCode));
551      }
552    }
553    containerMaxRetries = Integer.parseInt(
554        cliParser.getOptionValue("container_max_retries", "0"));
555    containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
556        "container_retry_interval", "0"));
557
558    if (YarnConfiguration.timelineServiceEnabled(conf)) {
559      timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
560    } else {
561      timelineClient = null;
562      LOG.warn("Timeline service is not enabled");
563    }
564
565    return true;
566  }
567
568  /**
569   * Helper function to print usage
570   *
571   * @param opts Parsed command line options
572   */
573  private void printUsage(Options opts) {
574    new HelpFormatter().printHelp("ApplicationMaster", opts);
575  }
576
577  /**
578   * Main run function for the application master
579   *
580   * @throws YarnException
581   * @throws IOException
582   */
583  @SuppressWarnings({ "unchecked" })
584  public void run() throws YarnException, IOException, InterruptedException {
585    LOG.info("Starting ApplicationMaster");
586
587    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
588    // are marked as LimitedPrivate
589    Credentials credentials =
590        UserGroupInformation.getCurrentUser().getCredentials();
591    DataOutputBuffer dob = new DataOutputBuffer();
592    credentials.writeTokenStorageToStream(dob);
593    // Now remove the AM->RM token so that containers cannot access it.
594    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
595    LOG.info("Executing with tokens:");
596    while (iter.hasNext()) {
597      Token<?> token = iter.next();
598      LOG.info(token);
599      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
600        iter.remove();
601      }
602    }
603    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
604
605    // Create appSubmitterUgi and add original tokens to it
606    String appSubmitterUserName =
607        System.getenv(ApplicationConstants.Environment.USER.name());
608    appSubmitterUgi =
609        UserGroupInformation.createRemoteUser(appSubmitterUserName);
610    appSubmitterUgi.addCredentials(credentials);
611
612    AMRMClientAsync.AbstractCallbackHandler allocListener =
613        new RMCallbackHandler();
614    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
615    amRMClient.init(conf);
616    amRMClient.start();
617
618    containerListener = createNMCallbackHandler();
619    nmClientAsync = new NMClientAsyncImpl(containerListener);
620    nmClientAsync.init(conf);
621    nmClientAsync.start();
622
623    startTimelineClient(conf);
624    if (timelineServiceV2) {
625      // need to bind timelineClient
626      amRMClient.registerTimelineClient(timelineClient);
627    }
628    if(timelineClient != null) {
629      if (timelineServiceV2) {
630        publishApplicationAttemptEventOnTimelineServiceV2(
631            DSEvent.DS_APP_ATTEMPT_START);
632      } else {
633        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
634            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
635      }
636    }
637
638    // Setup local RPC Server to accept status requests directly from clients
639    // TODO need to setup a protocol for client to be able to communicate to
640    // the RPC server
641    // TODO use the rpc port info to register with the RM for the client to
642    // send requests to this app master
643
644    // Register self with ResourceManager
645    // This will start heartbeating to the RM
646    appMasterHostname = NetUtils.getHostname();
647    RegisterApplicationMasterResponse response = amRMClient
648        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
649            appMasterTrackingUrl);
650    // Dump out information about cluster capability as seen by the
651    // resource manager
652    long maxMem = response.getMaximumResourceCapability().getMemorySize();
653    LOG.info("Max mem capability of resources in this cluster " + maxMem);
654    
655    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
656    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
657
658    // A resource ask cannot exceed the max.
659    if (containerMemory > maxMem) {
660      LOG.info("Container memory specified above max threshold of cluster."
661          + " Using max value." + ", specified=" + containerMemory + ", max="
662          + maxMem);
663      containerMemory = maxMem;
664    }
665
666    if (containerVirtualCores > maxVCores) {
667      LOG.info("Container virtual cores specified above max threshold of cluster."
668          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
669          + maxVCores);
670      containerVirtualCores = maxVCores;
671    }
672
673    List<Container> previousAMRunningContainers =
674        response.getContainersFromPreviousAttempts();
675    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
676      + " previous attempts' running containers on AM registration.");
677    for(Container container: previousAMRunningContainers) {
678      launchedContainers.add(container.getId());
679    }
680    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
681
682
683    int numTotalContainersToRequest =
684        numTotalContainers - previousAMRunningContainers.size();
685    // Setup ask for containers from RM
686    // Send request for containers to RM
687    // Until we get our fully allocated quota, we keep on polling RM for
688    // containers
689    // Keep looping until all the containers are launched and shell script
690    // executed on them ( regardless of success/failure).
691    for (int i = 0; i < numTotalContainersToRequest; ++i) {
692      ContainerRequest containerAsk = setupContainerAskForRM();
693      amRMClient.addContainerRequest(containerAsk);
694    }
695    numRequestedContainers.set(numTotalContainers);
696  }
697
698  @VisibleForTesting
699  void startTimelineClient(final Configuration conf)
700      throws YarnException, IOException, InterruptedException {
701    try {
702      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
703        @Override
704        public Void run() throws Exception {
705          if (YarnConfiguration.timelineServiceEnabled(conf)) {
706            // Creating the Timeline Client
707            if (timelineServiceV2) {
708              timelineClient = TimelineClient.createTimelineClient(
709                  appAttemptID.getApplicationId());
710              LOG.info("Timeline service V2 client is enabled");
711            } else {
712              timelineClient = TimelineClient.createTimelineClient();
713              LOG.info("Timeline service V1 client is enabled");
714            }
715            timelineClient.init(conf);
716            timelineClient.start();
717          } else {
718            timelineClient = null;
719            LOG.warn("Timeline service is not enabled");
720          }
721          return null;
722        }
723      });
724    } catch (UndeclaredThrowableException e) {
725      throw new YarnException(e.getCause());
726    }
727  }
728
729  @VisibleForTesting
730  NMCallbackHandler createNMCallbackHandler() {
731    return new NMCallbackHandler(this);
732  }
733
734  @VisibleForTesting
735  protected boolean finish() {
736    // wait for completion.
737    while (!done
738        && (numCompletedContainers.get() != numTotalContainers)) {
739      try {
740        Thread.sleep(200);
741      } catch (InterruptedException ex) {}
742    }
743
744    if (timelineClient != null) {
745      if (timelineServiceV2) {
746        publishApplicationAttemptEventOnTimelineServiceV2(
747            DSEvent.DS_APP_ATTEMPT_END);
748      } else {
749        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
750            DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
751      }
752    }
753
754    // Join all launched threads
755    // needed for when we time out
756    // and we need to release containers
757    for (Thread launchThread : launchThreads) {
758      try {
759        launchThread.join(10000);
760      } catch (InterruptedException e) {
761        LOG.info("Exception thrown in thread join: " + e.getMessage());
762        e.printStackTrace();
763      }
764    }
765
766    // When the application completes, it should stop all running containers
767    LOG.info("Application completed. Stopping running containers");
768    nmClientAsync.stop();
769
770    // When the application completes, it should send a finish application
771    // signal to the RM
772    LOG.info("Application completed. Signalling finish to RM");
773
774    FinalApplicationStatus appStatus;
775    String appMessage = null;
776    boolean success = true;
777    if (numCompletedContainers.get() - numFailedContainers.get()
778        >= numTotalContainers) {
779      appStatus = FinalApplicationStatus.SUCCEEDED;
780    } else {
781      appStatus = FinalApplicationStatus.FAILED;
782      appMessage = "Diagnostics." + ", total=" + numTotalContainers
783          + ", completed=" + numCompletedContainers.get() + ", allocated="
784          + numAllocatedContainers.get() + ", failed="
785          + numFailedContainers.get();
786      LOG.info(appMessage);
787      success = false;
788    }
789    try {
790      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
791    } catch (YarnException ex) {
792      LOG.error("Failed to unregister application", ex);
793    } catch (IOException e) {
794      LOG.error("Failed to unregister application", e);
795    }
796    
797    amRMClient.stop();
798
799    // Stop Timeline Client
800    if(timelineClient != null) {
801      timelineClient.stop();
802    }
803
804    return success;
805  }
806
807  @VisibleForTesting
808  class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
809    @SuppressWarnings("unchecked")
810    @Override
811    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
812      LOG.info("Got response from RM for container ask, completedCnt="
813          + completedContainers.size());
814      for (ContainerStatus containerStatus : completedContainers) {
815        LOG.info(appAttemptID + " got container status for containerID="
816            + containerStatus.getContainerId() + ", state="
817            + containerStatus.getState() + ", exitStatus="
818            + containerStatus.getExitStatus() + ", diagnostics="
819            + containerStatus.getDiagnostics());
820
821        // non complete containers should not be here
822        assert (containerStatus.getState() == ContainerState.COMPLETE);
823        // ignore containers we know nothing about - probably from a previous
824        // attempt
825        if (!launchedContainers.contains(containerStatus.getContainerId())) {
826          LOG.info("Ignoring completed status of "
827              + containerStatus.getContainerId()
828              + "; unknown container(probably launched by previous attempt)");
829          continue;
830        }
831
832        // increment counters for completed/failed containers
833        int exitStatus = containerStatus.getExitStatus();
834        if (0 != exitStatus) {
835          // container failed
836          if (ContainerExitStatus.ABORTED != exitStatus) {
837            // shell script failed
838            // counts as completed
839            numCompletedContainers.incrementAndGet();
840            numFailedContainers.incrementAndGet();
841          } else {
842            // container was killed by framework, possibly preempted
843            // we should re-try as the container was lost for some reason
844            numAllocatedContainers.decrementAndGet();
845            numRequestedContainers.decrementAndGet();
846            // we do not need to release the container as it would be done
847            // by the RM
848          }
849        } else {
850          // nothing to do
851          // container completed successfully
852          numCompletedContainers.incrementAndGet();
853          LOG.info("Container completed successfully." + ", containerId="
854              + containerStatus.getContainerId());
855        }
856        if(timelineClient != null) {
857          if (timelineServiceV2) {
858            publishContainerEndEventOnTimelineServiceV2(containerStatus);
859          } else {
860            publishContainerEndEvent(
861                timelineClient, containerStatus, domainId, appSubmitterUgi);
862          }
863        }
864      }
865      
866      // ask for more containers if any failed
867      int askCount = numTotalContainers - numRequestedContainers.get();
868      numRequestedContainers.addAndGet(askCount);
869
870      if (askCount > 0) {
871        for (int i = 0; i < askCount; ++i) {
872          ContainerRequest containerAsk = setupContainerAskForRM();
873          amRMClient.addContainerRequest(containerAsk);
874        }
875      }
876      
877      if (numCompletedContainers.get() == numTotalContainers) {
878        done = true;
879      }
880    }
881
882    @Override
883    public void onContainersAllocated(List<Container> allocatedContainers) {
884      LOG.info("Got response from RM for container ask, allocatedCnt="
885          + allocatedContainers.size());
886      numAllocatedContainers.addAndGet(allocatedContainers.size());
887      for (Container allocatedContainer : allocatedContainers) {
888        String yarnShellId = Integer.toString(yarnShellIdCounter);
889        yarnShellIdCounter++;
890        LOG.info("Launching shell command on a new container."
891            + ", containerId=" + allocatedContainer.getId()
892            + ", yarnShellId=" + yarnShellId
893            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
894            + ":" + allocatedContainer.getNodeId().getPort()
895            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
896            + ", containerResourceMemory"
897            + allocatedContainer.getResource().getMemorySize()
898            + ", containerResourceVirtualCores"
899            + allocatedContainer.getResource().getVirtualCores());
900        // + ", containerToken"
901        // +allocatedContainer.getContainerToken().getIdentifier().toString());
902
903        Thread launchThread = createLaunchContainerThread(allocatedContainer,
904            yarnShellId);
905
906        // launch and start the container on a separate thread to keep
907        // the main thread unblocked
908        // as all containers may not be allocated at one go.
909        launchThreads.add(launchThread);
910        launchedContainers.add(allocatedContainer.getId());
911        launchThread.start();
912      }
913    }
914
915    @Override
916    public void onContainersUpdated(
917        List<UpdatedContainer> containers) {}
918
919    @Override
920    public void onShutdownRequest() {
921      done = true;
922    }
923
924    @Override
925    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
926
927    @Override
928    public float getProgress() {
929      // set progress to deliver to RM on next heartbeat
930      float progress = (float) numCompletedContainers.get()
931          / numTotalContainers;
932      return progress;
933    }
934
935    @Override
936    public void onError(Throwable e) {
937      LOG.error("Error in RMCallbackHandler: ", e);
938      done = true;
939      amRMClient.stop();
940    }
941  }
942
943  @VisibleForTesting
944  static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
945
946    private ConcurrentMap<ContainerId, Container> containers =
947        new ConcurrentHashMap<ContainerId, Container>();
948    private final ApplicationMaster applicationMaster;
949
950    public NMCallbackHandler(ApplicationMaster applicationMaster) {
951      this.applicationMaster = applicationMaster;
952    }
953
954    public void addContainer(ContainerId containerId, Container container) {
955      containers.putIfAbsent(containerId, container);
956    }
957
958    @Override
959    public void onContainerStopped(ContainerId containerId) {
960      if (LOG.isDebugEnabled()) {
961        LOG.debug("Succeeded to stop Container " + containerId);
962      }
963      containers.remove(containerId);
964    }
965
966    @Override
967    public void onContainerStatusReceived(ContainerId containerId,
968        ContainerStatus containerStatus) {
969      if (LOG.isDebugEnabled()) {
970        LOG.debug("Container Status: id=" + containerId + ", status=" +
971            containerStatus);
972      }
973    }
974
975    @Override
976    public void onContainerStarted(ContainerId containerId,
977        Map<String, ByteBuffer> allServiceResponse) {
978      if (LOG.isDebugEnabled()) {
979        LOG.debug("Succeeded to start Container " + containerId);
980      }
981      Container container = containers.get(containerId);
982      if (container != null) {
983        applicationMaster.nmClientAsync.getContainerStatusAsync(
984            containerId, container.getNodeId());
985      }
986      if(applicationMaster.timelineClient != null) {
987        if (applicationMaster.timelineServiceV2) {
988          applicationMaster.publishContainerStartEventOnTimelineServiceV2(
989              container);
990        } else {
991          applicationMaster.publishContainerStartEvent(
992              applicationMaster.timelineClient, container,
993              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
994        }
995      }
996    }
997
998    @Override
999    public void onContainerResourceIncreased(
1000        ContainerId containerId, Resource resource) {}
1001
1002    @Override
1003    public void onStartContainerError(ContainerId containerId, Throwable t) {
1004      LOG.error("Failed to start Container " + containerId);
1005      containers.remove(containerId);
1006      applicationMaster.numCompletedContainers.incrementAndGet();
1007      applicationMaster.numFailedContainers.incrementAndGet();
1008    }
1009
1010    @Override
1011    public void onGetContainerStatusError(
1012        ContainerId containerId, Throwable t) {
1013      LOG.error("Failed to query the status of Container " + containerId);
1014    }
1015
1016    @Override
1017    public void onStopContainerError(ContainerId containerId, Throwable t) {
1018      LOG.error("Failed to stop Container " + containerId);
1019      containers.remove(containerId);
1020    }
1021
1022    @Override
1023    public void onIncreaseContainerResourceError(
1024        ContainerId containerId, Throwable t) {}
1025
1026  }
1027
1028  /**
1029   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
1030   * that will execute the shell command.
1031   */
1032  private class LaunchContainerRunnable implements Runnable {
1033
1034    // Allocated container
1035    private Container container;
1036    private String shellId;
1037
1038    NMCallbackHandler containerListener;
1039
1040    /**
1041     * @param lcontainer Allocated container
1042     * @param containerListener Callback handler of the container
1043     */
1044    public LaunchContainerRunnable(Container lcontainer,
1045        NMCallbackHandler containerListener, String shellId) {
1046      this.container = lcontainer;
1047      this.containerListener = containerListener;
1048      this.shellId = shellId;
1049    }
1050
1051    @Override
1052    /**
1053     * Connects to CM, sets up container launch context 
1054     * for shell command and eventually dispatches the container 
1055     * start request to the CM. 
1056     */
1057    public void run() {
1058      LOG.info("Setting up container launch container for containerid="
1059          + container.getId() + " with shellid=" + shellId);
1060
1061      // Set the local resources
1062      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
1063
1064      // The container for the eventual shell commands needs its own local
1065      // resources too.
1066      // In this scenario, if a shell script is specified, we need to have it
1067      // copied and made available to the container.
1068      if (!scriptPath.isEmpty()) {
1069        Path renamedScriptPath = null;
1070        if (Shell.WINDOWS) {
1071          renamedScriptPath = new Path(scriptPath + ".bat");
1072        } else {
1073          renamedScriptPath = new Path(scriptPath + ".sh");
1074        }
1075
1076        try {
1077          // rename the script file based on the underlying OS syntax.
1078          renameScriptFile(renamedScriptPath);
1079        } catch (Exception e) {
1080          LOG.error(
1081              "Not able to add suffix (.bat/.sh) to the shell script filename",
1082              e);
1083          // We know we cannot continue launching the container
1084          // so we should release it.
1085          numCompletedContainers.incrementAndGet();
1086          numFailedContainers.incrementAndGet();
1087          return;
1088        }
1089
1090        URL yarnUrl = null;
1091        try {
1092          yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
1093        } catch (URISyntaxException e) {
1094          LOG.error("Error when trying to use shell script path specified"
1095              + " in env, path=" + renamedScriptPath, e);
1096          // A failure scenario on bad input such as invalid shell script path
1097          // We know we cannot continue launching the container
1098          // so we should release it.
1099          // TODO
1100          numCompletedContainers.incrementAndGet();
1101          numFailedContainers.incrementAndGet();
1102          return;
1103        }
1104        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
1105          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
1106          shellScriptPathLen, shellScriptPathTimestamp);
1107        localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
1108            EXEC_SHELL_STRING_PATH, shellRsrc);
1109        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
1110      }
1111
1112      // Set the necessary command to execute on the allocated container
1113      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
1114
1115      // Set executable command
1116      vargs.add(shellCommand);
1117      // Set shell script path
1118      if (!scriptPath.isEmpty()) {
1119        vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH
1120            : EXEC_SHELL_STRING_PATH);
1121      }
1122
1123      // Set args for the shell command if any
1124      vargs.add(shellArgs);
1125      // Add log redirect params
1126      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
1127      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
1128
1129      // Get final commmand
1130      StringBuilder command = new StringBuilder();
1131      for (CharSequence str : vargs) {
1132        command.append(str).append(" ");
1133      }
1134
1135      List<String> commands = new ArrayList<String>();
1136      commands.add(command.toString());
1137
1138      // Set up ContainerLaunchContext, setting local resource, environment,
1139      // command and token for constructor.
1140
1141      // Note for tokens: Set up tokens for the container too. Today, for normal
1142      // shell commands, the container in distribute-shell doesn't need any
1143      // tokens. We are populating them mainly for NodeManagers to be able to
1144      // download anyfiles in the distributed file-system. The tokens are
1145      // otherwise also useful in cases, for e.g., when one is running a
1146      // "hadoop dfs" command inside the distributed shell.
1147      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
1148      myShellEnv.put(YARN_SHELL_ID, shellId);
1149      ContainerRetryContext containerRetryContext =
1150          ContainerRetryContext.newInstance(
1151              containerRetryPolicy, containerRetryErrorCodes,
1152              containerMaxRetries, containrRetryInterval);
1153      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1154        localResources, myShellEnv, commands, null, allTokens.duplicate(),
1155          null, containerRetryContext);
1156      containerListener.addContainer(container.getId(), container);
1157      nmClientAsync.startContainerAsync(container, ctx);
1158    }
1159  }
1160
1161  private void renameScriptFile(final Path renamedScriptPath)
1162      throws IOException, InterruptedException {
1163    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1164      @Override
1165      public Void run() throws IOException {
1166        FileSystem fs = renamedScriptPath.getFileSystem(conf);
1167        fs.rename(new Path(scriptPath), renamedScriptPath);
1168        return null;
1169      }
1170    });
1171    LOG.info("User " + appSubmitterUgi.getUserName()
1172        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1173  }
1174
1175  /**
1176   * Setup the request that will be sent to the RM for the container ask.
1177   *
1178   * @return the setup ResourceRequest to be sent to RM
1179   */
1180  private ContainerRequest setupContainerAskForRM() {
1181    // setup requirements for hosts
1182    // using * as any host will do for the distributed shell app
1183    // set the priority for the request
1184    // TODO - what is the range for priority? how to decide?
1185    Priority pri = Priority.newInstance(requestPriority);
1186
1187    // Set up resource type requirements
1188    // For now, memory and CPU are supported so we set memory and cpu requirements
1189    Resource capability = Resource.newInstance(containerMemory,
1190      containerVirtualCores);
1191
1192    ContainerRequest request = new ContainerRequest(capability, null, null,
1193        pri);
1194    LOG.info("Requested container ask: " + request.toString());
1195    return request;
1196  }
1197
1198  private boolean fileExist(String filePath) {
1199    return new File(filePath).exists();
1200  }
1201
1202  private String readContent(String filePath) throws IOException {
1203    DataInputStream ds = null;
1204    try {
1205      ds = new DataInputStream(new FileInputStream(filePath));
1206      return ds.readUTF();
1207    } finally {
1208      org.apache.commons.io.IOUtils.closeQuietly(ds);
1209    }
1210  }
1211
1212  private void publishContainerStartEvent(
1213      final TimelineClient timelineClient, final Container container,
1214      String domainId, UserGroupInformation ugi) {
1215    final TimelineEntity entity = new TimelineEntity();
1216    entity.setEntityId(container.getId().toString());
1217    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1218    entity.setDomainId(domainId);
1219    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1220    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
1221        .getApplicationAttemptId().getApplicationId().toString());
1222    TimelineEvent event = new TimelineEvent();
1223    event.setTimestamp(System.currentTimeMillis());
1224    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1225    event.addEventInfo("Node", container.getNodeId().toString());
1226    event.addEventInfo("Resources", container.getResource().toString());
1227    entity.addEvent(event);
1228
1229    try {
1230      processTimelineResponseErrors(
1231          putContainerEntity(timelineClient,
1232              container.getId().getApplicationAttemptId(),
1233              entity));
1234    } catch (YarnException | IOException | ClientHandlerException e) {
1235      LOG.error("Container start event could not be published for "
1236          + container.getId().toString(), e);
1237    }
1238  }
1239
1240  @VisibleForTesting
1241  void publishContainerEndEvent(
1242      final TimelineClient timelineClient, ContainerStatus container,
1243      String domainId, UserGroupInformation ugi) {
1244    final TimelineEntity entity = new TimelineEntity();
1245    entity.setEntityId(container.getContainerId().toString());
1246    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1247    entity.setDomainId(domainId);
1248    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1249    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
1250        container.getContainerId().getApplicationAttemptId()
1251            .getApplicationId().toString());
1252    TimelineEvent event = new TimelineEvent();
1253    event.setTimestamp(System.currentTimeMillis());
1254    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1255    event.addEventInfo("State", container.getState().name());
1256    event.addEventInfo("Exit Status", container.getExitStatus());
1257    entity.addEvent(event);
1258    try {
1259      processTimelineResponseErrors(
1260          putContainerEntity(timelineClient,
1261              container.getContainerId().getApplicationAttemptId(),
1262              entity));
1263    } catch (YarnException | IOException | ClientHandlerException e) {
1264      LOG.error("Container end event could not be published for "
1265          + container.getContainerId().toString(), e);
1266    }
1267  }
1268
1269  private TimelinePutResponse putContainerEntity(
1270      TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
1271      TimelineEntity entity)
1272      throws YarnException, IOException {
1273    if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
1274      TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
1275          currAttemptId.getApplicationId(),
1276          CONTAINER_ENTITY_GROUP_ID);
1277      return timelineClient.putEntities(currAttemptId, groupId, entity);
1278    } else {
1279      return timelineClient.putEntities(entity);
1280    }
1281  }
1282
1283  private void publishApplicationAttemptEvent(
1284      final TimelineClient timelineClient, String appAttemptId,
1285      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1286    final TimelineEntity entity = new TimelineEntity();
1287    entity.setEntityId(appAttemptId);
1288    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1289    entity.setDomainId(domainId);
1290    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1291    TimelineEvent event = new TimelineEvent();
1292    event.setEventType(appEvent.toString());
1293    event.setTimestamp(System.currentTimeMillis());
1294    entity.addEvent(event);
1295    try {
1296      TimelinePutResponse response = timelineClient.putEntities(entity);
1297      processTimelineResponseErrors(response);
1298    } catch (YarnException | IOException | ClientHandlerException e) {
1299      LOG.error("App Attempt "
1300          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1301          + " event could not be published for "
1302          + appAttemptID, e);
1303    }
1304  }
1305
1306  private TimelinePutResponse processTimelineResponseErrors(
1307      TimelinePutResponse response) {
1308    List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
1309    if (errors.size() == 0) {
1310      LOG.debug("Timeline entities are successfully put");
1311    } else {
1312      for (TimelinePutResponse.TimelinePutError error : errors) {
1313        LOG.error(
1314            "Error when publishing entity [" + error.getEntityType() + ","
1315                + error.getEntityId() + "], server side error code: "
1316                + error.getErrorCode());
1317      }
1318    }
1319    return response;
1320  }
1321
1322  RMCallbackHandler getRMCallbackHandler() {
1323    return new RMCallbackHandler();
1324  }
1325
1326  @VisibleForTesting
1327  void setAmRMClient(AMRMClientAsync client) {
1328    this.amRMClient = client;
1329  }
1330
1331  @VisibleForTesting
1332  int getNumCompletedContainers() {
1333    return numCompletedContainers.get();
1334  }
1335
1336  @VisibleForTesting
1337  boolean getDone() {
1338    return done;
1339  }
1340
1341  @VisibleForTesting
1342  Thread createLaunchContainerThread(Container allocatedContainer,
1343      String shellId) {
1344    LaunchContainerRunnable runnableLaunchContainer =
1345        new LaunchContainerRunnable(allocatedContainer, containerListener,
1346            shellId);
1347    return new Thread(runnableLaunchContainer);
1348  }
1349
1350  private void publishContainerStartEventOnTimelineServiceV2(
1351      Container container) {
1352    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
1353        entity =
1354            new org.apache.hadoop.yarn.api.records.timelineservice.
1355            TimelineEntity();
1356    entity.setId(container.getId().toString());
1357    entity.setType(DSEntity.DS_CONTAINER.toString());
1358    long ts = System.currentTimeMillis();
1359    entity.setCreatedTime(ts);
1360    entity.addInfo("user", appSubmitterUgi.getShortUserName());
1361
1362    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
1363        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
1364    event.setTimestamp(ts);
1365    event.setId(DSEvent.DS_CONTAINER_START.toString());
1366    event.addInfo("Node", container.getNodeId().toString());
1367    event.addInfo("Resources", container.getResource().toString());
1368    entity.addEvent(event);
1369
1370    try {
1371      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
1372        @Override
1373        public TimelinePutResponse run() throws Exception {
1374          timelineClient.putEntities(entity);
1375          return null;
1376        }
1377      });
1378    } catch (Exception e) {
1379      LOG.error("Container start event could not be published for "
1380          + container.getId().toString(),
1381          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1382    }
1383  }
1384
1385  private void publishContainerEndEventOnTimelineServiceV2(
1386      final ContainerStatus container) {
1387    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
1388        entity =
1389            new org.apache.hadoop.yarn.api.records.timelineservice.
1390            TimelineEntity();
1391    entity.setId(container.getContainerId().toString());
1392    entity.setType(DSEntity.DS_CONTAINER.toString());
1393    //entity.setDomainId(domainId);
1394    entity.addInfo("user", appSubmitterUgi.getShortUserName());
1395    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
1396        new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
1397    event.setTimestamp(System.currentTimeMillis());
1398    event.setId(DSEvent.DS_CONTAINER_END.toString());
1399    event.addInfo("State", container.getState().name());
1400    event.addInfo("Exit Status", container.getExitStatus());
1401    entity.addEvent(event);
1402
1403    try {
1404      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
1405        @Override
1406        public TimelinePutResponse run() throws Exception {
1407          timelineClient.putEntities(entity);
1408          return null;
1409        }
1410      });
1411    } catch (Exception e) {
1412      LOG.error("Container end event could not be published for "
1413          + container.getContainerId().toString(),
1414          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1415    }
1416  }
1417
1418  private void publishApplicationAttemptEventOnTimelineServiceV2(
1419      DSEvent appEvent) {
1420    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
1421        entity =
1422            new org.apache.hadoop.yarn.api.records.timelineservice.
1423            TimelineEntity();
1424    entity.setId(appAttemptID.toString());
1425    entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
1426    long ts = System.currentTimeMillis();
1427    if (appEvent == DSEvent.DS_APP_ATTEMPT_START) {
1428      entity.setCreatedTime(ts);
1429    }
1430    entity.addInfo("user", appSubmitterUgi.getShortUserName());
1431    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
1432        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
1433    event.setId(appEvent.toString());
1434    event.setTimestamp(ts);
1435    entity.addEvent(event);
1436
1437    try {
1438      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
1439        @Override
1440        public TimelinePutResponse run() throws Exception {
1441          timelineClient.putEntitiesAsync(entity);
1442          return null;
1443        }
1444      });
1445    } catch (Exception e) {
1446      LOG.error("App Attempt "
1447          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1448          + " event could not be published for "
1449          + appAttemptID,
1450          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1451    }
1452  }
1453
1454}