001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.Vector; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.atomic.AtomicInteger; 044 045import org.apache.commons.cli.CommandLine; 046import org.apache.commons.cli.GnuParser; 047import org.apache.commons.cli.HelpFormatter; 048import org.apache.commons.cli.Options; 049import org.apache.commons.cli.ParseException; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.apache.hadoop.classification.InterfaceAudience; 053import org.apache.hadoop.classification.InterfaceAudience.Private; 054import org.apache.hadoop.classification.InterfaceStability; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.io.DataOutputBuffer; 059import org.apache.hadoop.io.IOUtils; 060import org.apache.hadoop.net.NetUtils; 061import org.apache.hadoop.security.Credentials; 062import org.apache.hadoop.security.UserGroupInformation; 063import org.apache.hadoop.security.token.Token; 064import org.apache.hadoop.util.ExitUtil; 065import org.apache.hadoop.util.Shell; 066import org.apache.hadoop.yarn.api.ApplicationConstants; 067import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 068import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 069import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 070import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 071import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 072import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 073import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 074import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 075import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 076import org.apache.hadoop.yarn.api.records.Container; 077import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 078import org.apache.hadoop.yarn.api.records.ContainerId; 079import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 080import org.apache.hadoop.yarn.api.records.ContainerRetryContext; 081import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; 082import org.apache.hadoop.yarn.api.records.ContainerState; 083import org.apache.hadoop.yarn.api.records.ContainerStatus; 084import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 085import org.apache.hadoop.yarn.api.records.LocalResource; 086import org.apache.hadoop.yarn.api.records.LocalResourceType; 087import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 088import org.apache.hadoop.yarn.api.records.NodeReport; 089import org.apache.hadoop.yarn.api.records.Priority; 090import org.apache.hadoop.yarn.api.records.Resource; 091import org.apache.hadoop.yarn.api.records.ResourceRequest; 092import org.apache.hadoop.yarn.api.records.URL; 093import org.apache.hadoop.yarn.api.records.UpdatedContainer; 094import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 095import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; 096import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 097import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 098import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 099import org.apache.hadoop.yarn.client.api.TimelineClient; 100import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 101import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 102import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 103import org.apache.hadoop.yarn.conf.YarnConfiguration; 104import org.apache.hadoop.yarn.exceptions.YarnException; 105import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 106import org.apache.hadoop.yarn.util.timeline.TimelineUtils; 107import org.apache.log4j.LogManager; 108 109import com.google.common.annotations.VisibleForTesting; 110import com.sun.jersey.api.client.ClientHandlerException; 111 112/** 113 * An ApplicationMaster for executing shell commands on a set of launched 114 * containers using the YARN framework. 115 * 116 * <p> 117 * This class is meant to act as an example on how to write yarn-based 118 * application masters. 119 * </p> 120 * 121 * <p> 122 * The ApplicationMaster is started on a container by the 123 * <code>ResourceManager</code>'s launcher. The first thing that the 124 * <code>ApplicationMaster</code> needs to do is to connect and register itself 125 * with the <code>ResourceManager</code>. The registration sets up information 126 * within the <code>ResourceManager</code> regarding what host:port the 127 * ApplicationMaster is listening on to provide any form of functionality to a 128 * client as well as a tracking url that a client can use to keep track of 129 * status/job history if needed. However, in the distributedshell, trackingurl 130 * and appMasterHost:appMasterRpcPort are not supported. 131 * </p> 132 * 133 * <p> 134 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 135 * <code>ResourceManager</code> at regular intervals to inform the 136 * <code>ResourceManager</code> that it is up and alive. The 137 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 138 * <code>ApplicationMaster</code> acts as a heartbeat. 139 * 140 * <p> 141 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 142 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 143 * required no. of containers using {@link ResourceRequest} with the necessary 144 * resource specifications such as node location, computational 145 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 146 * responds with an {@link AllocateResponse} that informs the 147 * <code>ApplicationMaster</code> of the set of newly allocated containers, 148 * completed containers as well as current state of available resources. 149 * </p> 150 * 151 * <p> 152 * For each allocated container, the <code>ApplicationMaster</code> can then set 153 * up the necessary launch context via {@link ContainerLaunchContext} to specify 154 * the allocated container id, local resources required by the executable, the 155 * environment to be setup for the executable, commands to execute, etc. and 156 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 157 * launch and execute the defined commands on the given allocated container. 158 * </p> 159 * 160 * <p> 161 * The <code>ApplicationMaster</code> can monitor the launched container by 162 * either querying the <code>ResourceManager</code> using 163 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 164 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 165 * container's {@link ContainerId}. 166 * 167 * <p> 168 * After the job has been completed, the <code>ApplicationMaster</code> has to 169 * send a {@link FinishApplicationMasterRequest} to the 170 * <code>ResourceManager</code> to inform it that the 171 * <code>ApplicationMaster</code> has been completed. 172 */ 173@InterfaceAudience.Public 174@InterfaceStability.Unstable 175public class ApplicationMaster { 176 177 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 178 179 @VisibleForTesting 180 @Private 181 public static enum DSEvent { 182 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 183 } 184 185 @VisibleForTesting 186 @Private 187 public static enum DSEntity { 188 DS_APP_ATTEMPT, DS_CONTAINER 189 } 190 191 private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; 192 193 // Configuration 194 private Configuration conf; 195 196 // Handle to communicate with the Resource Manager 197 @SuppressWarnings("rawtypes") 198 private AMRMClientAsync amRMClient; 199 200 // In both secure and non-secure modes, this points to the job-submitter. 201 @VisibleForTesting 202 UserGroupInformation appSubmitterUgi; 203 204 // Handle to communicate with the Node Manager 205 private NMClientAsync nmClientAsync; 206 // Listen to process the response from the Node Manager 207 private NMCallbackHandler containerListener; 208 209 // Application Attempt Id ( combination of attemptId and fail count ) 210 @VisibleForTesting 211 protected ApplicationAttemptId appAttemptID; 212 213 // TODO 214 // For status update for clients - yet to be implemented 215 // Hostname of the container 216 private String appMasterHostname = ""; 217 // Port on which the app master listens for status updates from clients 218 private int appMasterRpcPort = -1; 219 // Tracking url to which app master publishes info for clients to monitor 220 private String appMasterTrackingUrl = ""; 221 222 private boolean timelineServiceV2 = false; 223 224 // App Master configuration 225 // No. of containers to run shell command on 226 @VisibleForTesting 227 protected int numTotalContainers = 1; 228 // Memory to request for the container on which the shell command will run 229 private long containerMemory = 10; 230 // VirtualCores to request for the container on which the shell command will run 231 private int containerVirtualCores = 1; 232 // Priority of the request 233 private int requestPriority; 234 235 // Counter for completed containers ( complete denotes successful or failed ) 236 private AtomicInteger numCompletedContainers = new AtomicInteger(); 237 // Allocated container count so that we know how many containers has the RM 238 // allocated to us 239 @VisibleForTesting 240 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 241 // Count of failed containers 242 private AtomicInteger numFailedContainers = new AtomicInteger(); 243 // Count of containers already requested from the RM 244 // Needed as once requested, we should not request for containers again. 245 // Only request for more if the original requirement changes. 246 @VisibleForTesting 247 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 248 249 // Shell command to be executed 250 private String shellCommand = ""; 251 // Args to be passed to the shell command 252 private String shellArgs = ""; 253 // Env variables to be setup for the shell command 254 private Map<String, String> shellEnv = new HashMap<String, String>(); 255 256 // Location of shell script ( obtained from info set in env ) 257 // Shell script path in fs 258 private String scriptPath = ""; 259 // Timestamp needed for creating a local resource 260 private long shellScriptPathTimestamp = 0; 261 // File length needed for local resource 262 private long shellScriptPathLen = 0; 263 264 // Container retry options 265 private ContainerRetryPolicy containerRetryPolicy = 266 ContainerRetryPolicy.NEVER_RETRY; 267 private Set<Integer> containerRetryErrorCodes = null; 268 private int containerMaxRetries = 0; 269 private int containrRetryInterval = 0; 270 271 // Timeline domain ID 272 private String domainId = null; 273 274 // Hardcoded path to shell script in launch container's local env 275 private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH 276 + ".sh"; 277 private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH 278 + ".bat"; 279 280 // Hardcoded path to custom log_properties 281 private static final String log4jPath = "log4j.properties"; 282 283 private static final String shellCommandPath = "shellCommands"; 284 private static final String shellArgsPath = "shellArgs"; 285 286 private volatile boolean done; 287 288 private ByteBuffer allTokens; 289 290 // Launch threads 291 private List<Thread> launchThreads = new ArrayList<Thread>(); 292 293 // Timeline Client 294 @VisibleForTesting 295 TimelineClient timelineClient; 296 static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; 297 static final String APPID_TIMELINE_FILTER_NAME = "appId"; 298 static final String USER_TIMELINE_FILTER_NAME = "user"; 299 300 private final String linux_bash_command = "bash"; 301 private final String windows_command = "cmd /c"; 302 303 private int yarnShellIdCounter = 1; 304 305 @VisibleForTesting 306 protected final Set<ContainerId> launchedContainers = 307 Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); 308 309 /** 310 * @param args Command line args 311 */ 312 public static void main(String[] args) { 313 boolean result = false; 314 try { 315 ApplicationMaster appMaster = new ApplicationMaster(); 316 LOG.info("Initializing ApplicationMaster"); 317 boolean doRun = appMaster.init(args); 318 if (!doRun) { 319 System.exit(0); 320 } 321 appMaster.run(); 322 result = appMaster.finish(); 323 } catch (Throwable t) { 324 LOG.fatal("Error running ApplicationMaster", t); 325 LogManager.shutdown(); 326 ExitUtil.terminate(1, t); 327 } 328 if (result) { 329 LOG.info("Application Master completed successfully. exiting"); 330 System.exit(0); 331 } else { 332 LOG.info("Application Master failed. exiting"); 333 System.exit(2); 334 } 335 } 336 337 /** 338 * Dump out contents of $CWD and the environment to stdout for debugging 339 */ 340 private void dumpOutDebugInfo() { 341 342 LOG.info("Dump debug output"); 343 Map<String, String> envs = System.getenv(); 344 for (Map.Entry<String, String> env : envs.entrySet()) { 345 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 346 System.out.println("System env: key=" + env.getKey() + ", val=" 347 + env.getValue()); 348 } 349 350 BufferedReader buf = null; 351 try { 352 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 353 Shell.execCommand("ls", "-al"); 354 buf = new BufferedReader(new StringReader(lines)); 355 String line = ""; 356 while ((line = buf.readLine()) != null) { 357 LOG.info("System CWD content: " + line); 358 System.out.println("System CWD content: " + line); 359 } 360 } catch (IOException e) { 361 e.printStackTrace(); 362 } finally { 363 IOUtils.cleanup(LOG, buf); 364 } 365 } 366 367 public ApplicationMaster() { 368 // Set up the configuration 369 conf = new YarnConfiguration(); 370 } 371 372 /** 373 * Parse command line options 374 * 375 * @param args Command line args 376 * @return Whether init successful and run should be invoked 377 * @throws ParseException 378 * @throws IOException 379 */ 380 public boolean init(String[] args) throws ParseException, IOException { 381 Options opts = new Options(); 382 opts.addOption("app_attempt_id", true, 383 "App Attempt ID. Not to be used unless for testing purposes"); 384 opts.addOption("shell_env", true, 385 "Environment for shell script. Specified as env_key=env_val pairs"); 386 opts.addOption("container_memory", true, 387 "Amount of memory in MB to be requested to run the shell command"); 388 opts.addOption("container_vcores", true, 389 "Amount of virtual cores to be requested to run the shell command"); 390 opts.addOption("num_containers", true, 391 "No. of containers on which the shell command needs to be executed"); 392 opts.addOption("priority", true, "Application Priority. Default 0"); 393 opts.addOption("container_retry_policy", true, 394 "Retry policy when container fails to run, " 395 + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " 396 + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); 397 opts.addOption("container_retry_error_codes", true, 398 "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " 399 + "codes is specified with this option, " 400 + "e.g. --container_retry_error_codes 1,2,3"); 401 opts.addOption("container_max_retries", true, 402 "If container could retry, it specifies max retires"); 403 opts.addOption("container_retry_interval", true, 404 "Interval between each retry, unit is milliseconds"); 405 opts.addOption("debug", false, "Dump out debug information"); 406 407 opts.addOption("help", false, "Print usage"); 408 CommandLine cliParser = new GnuParser().parse(opts, args); 409 410 if (args.length == 0) { 411 printUsage(opts); 412 throw new IllegalArgumentException( 413 "No args specified for application master to initialize"); 414 } 415 416 //Check whether customer log4j.properties file exists 417 if (fileExist(log4jPath)) { 418 try { 419 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 420 log4jPath); 421 } catch (Exception e) { 422 LOG.warn("Can not set up custom log4j properties. " + e); 423 } 424 } 425 426 if (cliParser.hasOption("help")) { 427 printUsage(opts); 428 return false; 429 } 430 431 if (cliParser.hasOption("debug")) { 432 dumpOutDebugInfo(); 433 } 434 435 Map<String, String> envs = System.getenv(); 436 437 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 438 if (cliParser.hasOption("app_attempt_id")) { 439 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 440 appAttemptID = ApplicationAttemptId.fromString(appIdStr); 441 } else { 442 throw new IllegalArgumentException( 443 "Application Attempt Id not set in the environment"); 444 } 445 } else { 446 ContainerId containerId = ContainerId.fromString(envs 447 .get(Environment.CONTAINER_ID.name())); 448 appAttemptID = containerId.getApplicationAttemptId(); 449 } 450 451 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 452 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 453 + " not set in the environment"); 454 } 455 if (!envs.containsKey(Environment.NM_HOST.name())) { 456 throw new RuntimeException(Environment.NM_HOST.name() 457 + " not set in the environment"); 458 } 459 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 460 throw new RuntimeException(Environment.NM_HTTP_PORT 461 + " not set in the environment"); 462 } 463 if (!envs.containsKey(Environment.NM_PORT.name())) { 464 throw new RuntimeException(Environment.NM_PORT.name() 465 + " not set in the environment"); 466 } 467 468 LOG.info("Application master for app" + ", appId=" 469 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 470 + appAttemptID.getApplicationId().getClusterTimestamp() 471 + ", attemptId=" + appAttemptID.getAttemptId()); 472 473 if (!fileExist(shellCommandPath) 474 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 475 throw new IllegalArgumentException( 476 "No shell command or shell script specified to be executed by application master"); 477 } 478 479 if (fileExist(shellCommandPath)) { 480 shellCommand = readContent(shellCommandPath); 481 } 482 483 if (fileExist(shellArgsPath)) { 484 shellArgs = readContent(shellArgsPath); 485 } 486 487 if (cliParser.hasOption("shell_env")) { 488 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 489 for (String env : shellEnvs) { 490 env = env.trim(); 491 int index = env.indexOf('='); 492 if (index == -1) { 493 shellEnv.put(env, ""); 494 continue; 495 } 496 String key = env.substring(0, index); 497 String val = ""; 498 if (index < (env.length() - 1)) { 499 val = env.substring(index + 1); 500 } 501 shellEnv.put(key, val); 502 } 503 } 504 505 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 506 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 507 508 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 509 shellScriptPathTimestamp = Long.parseLong(envs 510 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 511 } 512 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 513 shellScriptPathLen = Long.parseLong(envs 514 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 515 } 516 if (!scriptPath.isEmpty() 517 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 518 LOG.error("Illegal values in env for shell script path" + ", path=" 519 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 520 + shellScriptPathTimestamp); 521 throw new IllegalArgumentException( 522 "Illegal values in env for shell script path"); 523 } 524 } 525 526 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 527 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 528 } 529 530 containerMemory = Integer.parseInt(cliParser.getOptionValue( 531 "container_memory", "10")); 532 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 533 "container_vcores", "1")); 534 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 535 "num_containers", "1")); 536 if (numTotalContainers == 0) { 537 throw new IllegalArgumentException( 538 "Cannot run distributed shell with no containers"); 539 } 540 requestPriority = Integer.parseInt(cliParser 541 .getOptionValue("priority", "0")); 542 543 containerRetryPolicy = ContainerRetryPolicy.values()[ 544 Integer.parseInt(cliParser.getOptionValue( 545 "container_retry_policy", "0"))]; 546 if (cliParser.hasOption("container_retry_error_codes")) { 547 containerRetryErrorCodes = new HashSet<>(); 548 for (String errorCode : 549 cliParser.getOptionValue("container_retry_error_codes").split(",")) { 550 containerRetryErrorCodes.add(Integer.parseInt(errorCode)); 551 } 552 } 553 containerMaxRetries = Integer.parseInt( 554 cliParser.getOptionValue("container_max_retries", "0")); 555 containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( 556 "container_retry_interval", "0")); 557 558 if (YarnConfiguration.timelineServiceEnabled(conf)) { 559 timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf); 560 } else { 561 timelineClient = null; 562 LOG.warn("Timeline service is not enabled"); 563 } 564 565 return true; 566 } 567 568 /** 569 * Helper function to print usage 570 * 571 * @param opts Parsed command line options 572 */ 573 private void printUsage(Options opts) { 574 new HelpFormatter().printHelp("ApplicationMaster", opts); 575 } 576 577 /** 578 * Main run function for the application master 579 * 580 * @throws YarnException 581 * @throws IOException 582 */ 583 @SuppressWarnings({ "unchecked" }) 584 public void run() throws YarnException, IOException, InterruptedException { 585 LOG.info("Starting ApplicationMaster"); 586 587 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 588 // are marked as LimitedPrivate 589 Credentials credentials = 590 UserGroupInformation.getCurrentUser().getCredentials(); 591 DataOutputBuffer dob = new DataOutputBuffer(); 592 credentials.writeTokenStorageToStream(dob); 593 // Now remove the AM->RM token so that containers cannot access it. 594 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 595 LOG.info("Executing with tokens:"); 596 while (iter.hasNext()) { 597 Token<?> token = iter.next(); 598 LOG.info(token); 599 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 600 iter.remove(); 601 } 602 } 603 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 604 605 // Create appSubmitterUgi and add original tokens to it 606 String appSubmitterUserName = 607 System.getenv(ApplicationConstants.Environment.USER.name()); 608 appSubmitterUgi = 609 UserGroupInformation.createRemoteUser(appSubmitterUserName); 610 appSubmitterUgi.addCredentials(credentials); 611 612 AMRMClientAsync.AbstractCallbackHandler allocListener = 613 new RMCallbackHandler(); 614 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 615 amRMClient.init(conf); 616 amRMClient.start(); 617 618 containerListener = createNMCallbackHandler(); 619 nmClientAsync = new NMClientAsyncImpl(containerListener); 620 nmClientAsync.init(conf); 621 nmClientAsync.start(); 622 623 startTimelineClient(conf); 624 if (timelineServiceV2) { 625 // need to bind timelineClient 626 amRMClient.registerTimelineClient(timelineClient); 627 } 628 if(timelineClient != null) { 629 if (timelineServiceV2) { 630 publishApplicationAttemptEventOnTimelineServiceV2( 631 DSEvent.DS_APP_ATTEMPT_START); 632 } else { 633 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 634 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 635 } 636 } 637 638 // Setup local RPC Server to accept status requests directly from clients 639 // TODO need to setup a protocol for client to be able to communicate to 640 // the RPC server 641 // TODO use the rpc port info to register with the RM for the client to 642 // send requests to this app master 643 644 // Register self with ResourceManager 645 // This will start heartbeating to the RM 646 appMasterHostname = NetUtils.getHostname(); 647 RegisterApplicationMasterResponse response = amRMClient 648 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 649 appMasterTrackingUrl); 650 // Dump out information about cluster capability as seen by the 651 // resource manager 652 long maxMem = response.getMaximumResourceCapability().getMemorySize(); 653 LOG.info("Max mem capability of resources in this cluster " + maxMem); 654 655 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 656 LOG.info("Max vcores capability of resources in this cluster " + maxVCores); 657 658 // A resource ask cannot exceed the max. 659 if (containerMemory > maxMem) { 660 LOG.info("Container memory specified above max threshold of cluster." 661 + " Using max value." + ", specified=" + containerMemory + ", max=" 662 + maxMem); 663 containerMemory = maxMem; 664 } 665 666 if (containerVirtualCores > maxVCores) { 667 LOG.info("Container virtual cores specified above max threshold of cluster." 668 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 669 + maxVCores); 670 containerVirtualCores = maxVCores; 671 } 672 673 List<Container> previousAMRunningContainers = 674 response.getContainersFromPreviousAttempts(); 675 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 676 + " previous attempts' running containers on AM registration."); 677 for(Container container: previousAMRunningContainers) { 678 launchedContainers.add(container.getId()); 679 } 680 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 681 682 683 int numTotalContainersToRequest = 684 numTotalContainers - previousAMRunningContainers.size(); 685 // Setup ask for containers from RM 686 // Send request for containers to RM 687 // Until we get our fully allocated quota, we keep on polling RM for 688 // containers 689 // Keep looping until all the containers are launched and shell script 690 // executed on them ( regardless of success/failure). 691 for (int i = 0; i < numTotalContainersToRequest; ++i) { 692 ContainerRequest containerAsk = setupContainerAskForRM(); 693 amRMClient.addContainerRequest(containerAsk); 694 } 695 numRequestedContainers.set(numTotalContainers); 696 } 697 698 @VisibleForTesting 699 void startTimelineClient(final Configuration conf) 700 throws YarnException, IOException, InterruptedException { 701 try { 702 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 703 @Override 704 public Void run() throws Exception { 705 if (YarnConfiguration.timelineServiceEnabled(conf)) { 706 // Creating the Timeline Client 707 if (timelineServiceV2) { 708 timelineClient = TimelineClient.createTimelineClient( 709 appAttemptID.getApplicationId()); 710 LOG.info("Timeline service V2 client is enabled"); 711 } else { 712 timelineClient = TimelineClient.createTimelineClient(); 713 LOG.info("Timeline service V1 client is enabled"); 714 } 715 timelineClient.init(conf); 716 timelineClient.start(); 717 } else { 718 timelineClient = null; 719 LOG.warn("Timeline service is not enabled"); 720 } 721 return null; 722 } 723 }); 724 } catch (UndeclaredThrowableException e) { 725 throw new YarnException(e.getCause()); 726 } 727 } 728 729 @VisibleForTesting 730 NMCallbackHandler createNMCallbackHandler() { 731 return new NMCallbackHandler(this); 732 } 733 734 @VisibleForTesting 735 protected boolean finish() { 736 // wait for completion. 737 while (!done 738 && (numCompletedContainers.get() != numTotalContainers)) { 739 try { 740 Thread.sleep(200); 741 } catch (InterruptedException ex) {} 742 } 743 744 if (timelineClient != null) { 745 if (timelineServiceV2) { 746 publishApplicationAttemptEventOnTimelineServiceV2( 747 DSEvent.DS_APP_ATTEMPT_END); 748 } else { 749 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 750 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 751 } 752 } 753 754 // Join all launched threads 755 // needed for when we time out 756 // and we need to release containers 757 for (Thread launchThread : launchThreads) { 758 try { 759 launchThread.join(10000); 760 } catch (InterruptedException e) { 761 LOG.info("Exception thrown in thread join: " + e.getMessage()); 762 e.printStackTrace(); 763 } 764 } 765 766 // When the application completes, it should stop all running containers 767 LOG.info("Application completed. Stopping running containers"); 768 nmClientAsync.stop(); 769 770 // When the application completes, it should send a finish application 771 // signal to the RM 772 LOG.info("Application completed. Signalling finish to RM"); 773 774 FinalApplicationStatus appStatus; 775 String appMessage = null; 776 boolean success = true; 777 if (numCompletedContainers.get() - numFailedContainers.get() 778 >= numTotalContainers) { 779 appStatus = FinalApplicationStatus.SUCCEEDED; 780 } else { 781 appStatus = FinalApplicationStatus.FAILED; 782 appMessage = "Diagnostics." + ", total=" + numTotalContainers 783 + ", completed=" + numCompletedContainers.get() + ", allocated=" 784 + numAllocatedContainers.get() + ", failed=" 785 + numFailedContainers.get(); 786 LOG.info(appMessage); 787 success = false; 788 } 789 try { 790 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 791 } catch (YarnException ex) { 792 LOG.error("Failed to unregister application", ex); 793 } catch (IOException e) { 794 LOG.error("Failed to unregister application", e); 795 } 796 797 amRMClient.stop(); 798 799 // Stop Timeline Client 800 if(timelineClient != null) { 801 timelineClient.stop(); 802 } 803 804 return success; 805 } 806 807 @VisibleForTesting 808 class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { 809 @SuppressWarnings("unchecked") 810 @Override 811 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 812 LOG.info("Got response from RM for container ask, completedCnt=" 813 + completedContainers.size()); 814 for (ContainerStatus containerStatus : completedContainers) { 815 LOG.info(appAttemptID + " got container status for containerID=" 816 + containerStatus.getContainerId() + ", state=" 817 + containerStatus.getState() + ", exitStatus=" 818 + containerStatus.getExitStatus() + ", diagnostics=" 819 + containerStatus.getDiagnostics()); 820 821 // non complete containers should not be here 822 assert (containerStatus.getState() == ContainerState.COMPLETE); 823 // ignore containers we know nothing about - probably from a previous 824 // attempt 825 if (!launchedContainers.contains(containerStatus.getContainerId())) { 826 LOG.info("Ignoring completed status of " 827 + containerStatus.getContainerId() 828 + "; unknown container(probably launched by previous attempt)"); 829 continue; 830 } 831 832 // increment counters for completed/failed containers 833 int exitStatus = containerStatus.getExitStatus(); 834 if (0 != exitStatus) { 835 // container failed 836 if (ContainerExitStatus.ABORTED != exitStatus) { 837 // shell script failed 838 // counts as completed 839 numCompletedContainers.incrementAndGet(); 840 numFailedContainers.incrementAndGet(); 841 } else { 842 // container was killed by framework, possibly preempted 843 // we should re-try as the container was lost for some reason 844 numAllocatedContainers.decrementAndGet(); 845 numRequestedContainers.decrementAndGet(); 846 // we do not need to release the container as it would be done 847 // by the RM 848 } 849 } else { 850 // nothing to do 851 // container completed successfully 852 numCompletedContainers.incrementAndGet(); 853 LOG.info("Container completed successfully." + ", containerId=" 854 + containerStatus.getContainerId()); 855 } 856 if(timelineClient != null) { 857 if (timelineServiceV2) { 858 publishContainerEndEventOnTimelineServiceV2(containerStatus); 859 } else { 860 publishContainerEndEvent( 861 timelineClient, containerStatus, domainId, appSubmitterUgi); 862 } 863 } 864 } 865 866 // ask for more containers if any failed 867 int askCount = numTotalContainers - numRequestedContainers.get(); 868 numRequestedContainers.addAndGet(askCount); 869 870 if (askCount > 0) { 871 for (int i = 0; i < askCount; ++i) { 872 ContainerRequest containerAsk = setupContainerAskForRM(); 873 amRMClient.addContainerRequest(containerAsk); 874 } 875 } 876 877 if (numCompletedContainers.get() == numTotalContainers) { 878 done = true; 879 } 880 } 881 882 @Override 883 public void onContainersAllocated(List<Container> allocatedContainers) { 884 LOG.info("Got response from RM for container ask, allocatedCnt=" 885 + allocatedContainers.size()); 886 numAllocatedContainers.addAndGet(allocatedContainers.size()); 887 for (Container allocatedContainer : allocatedContainers) { 888 String yarnShellId = Integer.toString(yarnShellIdCounter); 889 yarnShellIdCounter++; 890 LOG.info("Launching shell command on a new container." 891 + ", containerId=" + allocatedContainer.getId() 892 + ", yarnShellId=" + yarnShellId 893 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 894 + ":" + allocatedContainer.getNodeId().getPort() 895 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 896 + ", containerResourceMemory" 897 + allocatedContainer.getResource().getMemorySize() 898 + ", containerResourceVirtualCores" 899 + allocatedContainer.getResource().getVirtualCores()); 900 // + ", containerToken" 901 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 902 903 Thread launchThread = createLaunchContainerThread(allocatedContainer, 904 yarnShellId); 905 906 // launch and start the container on a separate thread to keep 907 // the main thread unblocked 908 // as all containers may not be allocated at one go. 909 launchThreads.add(launchThread); 910 launchedContainers.add(allocatedContainer.getId()); 911 launchThread.start(); 912 } 913 } 914 915 @Override 916 public void onContainersUpdated( 917 List<UpdatedContainer> containers) {} 918 919 @Override 920 public void onShutdownRequest() { 921 done = true; 922 } 923 924 @Override 925 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 926 927 @Override 928 public float getProgress() { 929 // set progress to deliver to RM on next heartbeat 930 float progress = (float) numCompletedContainers.get() 931 / numTotalContainers; 932 return progress; 933 } 934 935 @Override 936 public void onError(Throwable e) { 937 LOG.error("Error in RMCallbackHandler: ", e); 938 done = true; 939 amRMClient.stop(); 940 } 941 } 942 943 @VisibleForTesting 944 static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { 945 946 private ConcurrentMap<ContainerId, Container> containers = 947 new ConcurrentHashMap<ContainerId, Container>(); 948 private final ApplicationMaster applicationMaster; 949 950 public NMCallbackHandler(ApplicationMaster applicationMaster) { 951 this.applicationMaster = applicationMaster; 952 } 953 954 public void addContainer(ContainerId containerId, Container container) { 955 containers.putIfAbsent(containerId, container); 956 } 957 958 @Override 959 public void onContainerStopped(ContainerId containerId) { 960 if (LOG.isDebugEnabled()) { 961 LOG.debug("Succeeded to stop Container " + containerId); 962 } 963 containers.remove(containerId); 964 } 965 966 @Override 967 public void onContainerStatusReceived(ContainerId containerId, 968 ContainerStatus containerStatus) { 969 if (LOG.isDebugEnabled()) { 970 LOG.debug("Container Status: id=" + containerId + ", status=" + 971 containerStatus); 972 } 973 } 974 975 @Override 976 public void onContainerStarted(ContainerId containerId, 977 Map<String, ByteBuffer> allServiceResponse) { 978 if (LOG.isDebugEnabled()) { 979 LOG.debug("Succeeded to start Container " + containerId); 980 } 981 Container container = containers.get(containerId); 982 if (container != null) { 983 applicationMaster.nmClientAsync.getContainerStatusAsync( 984 containerId, container.getNodeId()); 985 } 986 if(applicationMaster.timelineClient != null) { 987 if (applicationMaster.timelineServiceV2) { 988 applicationMaster.publishContainerStartEventOnTimelineServiceV2( 989 container); 990 } else { 991 applicationMaster.publishContainerStartEvent( 992 applicationMaster.timelineClient, container, 993 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 994 } 995 } 996 } 997 998 @Override 999 public void onContainerResourceIncreased( 1000 ContainerId containerId, Resource resource) {} 1001 1002 @Override 1003 public void onStartContainerError(ContainerId containerId, Throwable t) { 1004 LOG.error("Failed to start Container " + containerId); 1005 containers.remove(containerId); 1006 applicationMaster.numCompletedContainers.incrementAndGet(); 1007 applicationMaster.numFailedContainers.incrementAndGet(); 1008 } 1009 1010 @Override 1011 public void onGetContainerStatusError( 1012 ContainerId containerId, Throwable t) { 1013 LOG.error("Failed to query the status of Container " + containerId); 1014 } 1015 1016 @Override 1017 public void onStopContainerError(ContainerId containerId, Throwable t) { 1018 LOG.error("Failed to stop Container " + containerId); 1019 containers.remove(containerId); 1020 } 1021 1022 @Override 1023 public void onIncreaseContainerResourceError( 1024 ContainerId containerId, Throwable t) {} 1025 1026 } 1027 1028 /** 1029 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 1030 * that will execute the shell command. 1031 */ 1032 private class LaunchContainerRunnable implements Runnable { 1033 1034 // Allocated container 1035 private Container container; 1036 private String shellId; 1037 1038 NMCallbackHandler containerListener; 1039 1040 /** 1041 * @param lcontainer Allocated container 1042 * @param containerListener Callback handler of the container 1043 */ 1044 public LaunchContainerRunnable(Container lcontainer, 1045 NMCallbackHandler containerListener, String shellId) { 1046 this.container = lcontainer; 1047 this.containerListener = containerListener; 1048 this.shellId = shellId; 1049 } 1050 1051 @Override 1052 /** 1053 * Connects to CM, sets up container launch context 1054 * for shell command and eventually dispatches the container 1055 * start request to the CM. 1056 */ 1057 public void run() { 1058 LOG.info("Setting up container launch container for containerid=" 1059 + container.getId() + " with shellid=" + shellId); 1060 1061 // Set the local resources 1062 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 1063 1064 // The container for the eventual shell commands needs its own local 1065 // resources too. 1066 // In this scenario, if a shell script is specified, we need to have it 1067 // copied and made available to the container. 1068 if (!scriptPath.isEmpty()) { 1069 Path renamedScriptPath = null; 1070 if (Shell.WINDOWS) { 1071 renamedScriptPath = new Path(scriptPath + ".bat"); 1072 } else { 1073 renamedScriptPath = new Path(scriptPath + ".sh"); 1074 } 1075 1076 try { 1077 // rename the script file based on the underlying OS syntax. 1078 renameScriptFile(renamedScriptPath); 1079 } catch (Exception e) { 1080 LOG.error( 1081 "Not able to add suffix (.bat/.sh) to the shell script filename", 1082 e); 1083 // We know we cannot continue launching the container 1084 // so we should release it. 1085 numCompletedContainers.incrementAndGet(); 1086 numFailedContainers.incrementAndGet(); 1087 return; 1088 } 1089 1090 URL yarnUrl = null; 1091 try { 1092 yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString())); 1093 } catch (URISyntaxException e) { 1094 LOG.error("Error when trying to use shell script path specified" 1095 + " in env, path=" + renamedScriptPath, e); 1096 // A failure scenario on bad input such as invalid shell script path 1097 // We know we cannot continue launching the container 1098 // so we should release it. 1099 // TODO 1100 numCompletedContainers.incrementAndGet(); 1101 numFailedContainers.incrementAndGet(); 1102 return; 1103 } 1104 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 1105 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1106 shellScriptPathLen, shellScriptPathTimestamp); 1107 localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH : 1108 EXEC_SHELL_STRING_PATH, shellRsrc); 1109 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 1110 } 1111 1112 // Set the necessary command to execute on the allocated container 1113 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 1114 1115 // Set executable command 1116 vargs.add(shellCommand); 1117 // Set shell script path 1118 if (!scriptPath.isEmpty()) { 1119 vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH 1120 : EXEC_SHELL_STRING_PATH); 1121 } 1122 1123 // Set args for the shell command if any 1124 vargs.add(shellArgs); 1125 // Add log redirect params 1126 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 1127 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 1128 1129 // Get final commmand 1130 StringBuilder command = new StringBuilder(); 1131 for (CharSequence str : vargs) { 1132 command.append(str).append(" "); 1133 } 1134 1135 List<String> commands = new ArrayList<String>(); 1136 commands.add(command.toString()); 1137 1138 // Set up ContainerLaunchContext, setting local resource, environment, 1139 // command and token for constructor. 1140 1141 // Note for tokens: Set up tokens for the container too. Today, for normal 1142 // shell commands, the container in distribute-shell doesn't need any 1143 // tokens. We are populating them mainly for NodeManagers to be able to 1144 // download anyfiles in the distributed file-system. The tokens are 1145 // otherwise also useful in cases, for e.g., when one is running a 1146 // "hadoop dfs" command inside the distributed shell. 1147 Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); 1148 myShellEnv.put(YARN_SHELL_ID, shellId); 1149 ContainerRetryContext containerRetryContext = 1150 ContainerRetryContext.newInstance( 1151 containerRetryPolicy, containerRetryErrorCodes, 1152 containerMaxRetries, containrRetryInterval); 1153 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1154 localResources, myShellEnv, commands, null, allTokens.duplicate(), 1155 null, containerRetryContext); 1156 containerListener.addContainer(container.getId(), container); 1157 nmClientAsync.startContainerAsync(container, ctx); 1158 } 1159 } 1160 1161 private void renameScriptFile(final Path renamedScriptPath) 1162 throws IOException, InterruptedException { 1163 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1164 @Override 1165 public Void run() throws IOException { 1166 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1167 fs.rename(new Path(scriptPath), renamedScriptPath); 1168 return null; 1169 } 1170 }); 1171 LOG.info("User " + appSubmitterUgi.getUserName() 1172 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1173 } 1174 1175 /** 1176 * Setup the request that will be sent to the RM for the container ask. 1177 * 1178 * @return the setup ResourceRequest to be sent to RM 1179 */ 1180 private ContainerRequest setupContainerAskForRM() { 1181 // setup requirements for hosts 1182 // using * as any host will do for the distributed shell app 1183 // set the priority for the request 1184 // TODO - what is the range for priority? how to decide? 1185 Priority pri = Priority.newInstance(requestPriority); 1186 1187 // Set up resource type requirements 1188 // For now, memory and CPU are supported so we set memory and cpu requirements 1189 Resource capability = Resource.newInstance(containerMemory, 1190 containerVirtualCores); 1191 1192 ContainerRequest request = new ContainerRequest(capability, null, null, 1193 pri); 1194 LOG.info("Requested container ask: " + request.toString()); 1195 return request; 1196 } 1197 1198 private boolean fileExist(String filePath) { 1199 return new File(filePath).exists(); 1200 } 1201 1202 private String readContent(String filePath) throws IOException { 1203 DataInputStream ds = null; 1204 try { 1205 ds = new DataInputStream(new FileInputStream(filePath)); 1206 return ds.readUTF(); 1207 } finally { 1208 org.apache.commons.io.IOUtils.closeQuietly(ds); 1209 } 1210 } 1211 1212 private void publishContainerStartEvent( 1213 final TimelineClient timelineClient, final Container container, 1214 String domainId, UserGroupInformation ugi) { 1215 final TimelineEntity entity = new TimelineEntity(); 1216 entity.setEntityId(container.getId().toString()); 1217 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1218 entity.setDomainId(domainId); 1219 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1220 entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId() 1221 .getApplicationAttemptId().getApplicationId().toString()); 1222 TimelineEvent event = new TimelineEvent(); 1223 event.setTimestamp(System.currentTimeMillis()); 1224 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1225 event.addEventInfo("Node", container.getNodeId().toString()); 1226 event.addEventInfo("Resources", container.getResource().toString()); 1227 entity.addEvent(event); 1228 1229 try { 1230 processTimelineResponseErrors( 1231 putContainerEntity(timelineClient, 1232 container.getId().getApplicationAttemptId(), 1233 entity)); 1234 } catch (YarnException | IOException | ClientHandlerException e) { 1235 LOG.error("Container start event could not be published for " 1236 + container.getId().toString(), e); 1237 } 1238 } 1239 1240 @VisibleForTesting 1241 void publishContainerEndEvent( 1242 final TimelineClient timelineClient, ContainerStatus container, 1243 String domainId, UserGroupInformation ugi) { 1244 final TimelineEntity entity = new TimelineEntity(); 1245 entity.setEntityId(container.getContainerId().toString()); 1246 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1247 entity.setDomainId(domainId); 1248 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1249 entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, 1250 container.getContainerId().getApplicationAttemptId() 1251 .getApplicationId().toString()); 1252 TimelineEvent event = new TimelineEvent(); 1253 event.setTimestamp(System.currentTimeMillis()); 1254 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1255 event.addEventInfo("State", container.getState().name()); 1256 event.addEventInfo("Exit Status", container.getExitStatus()); 1257 entity.addEvent(event); 1258 try { 1259 processTimelineResponseErrors( 1260 putContainerEntity(timelineClient, 1261 container.getContainerId().getApplicationAttemptId(), 1262 entity)); 1263 } catch (YarnException | IOException | ClientHandlerException e) { 1264 LOG.error("Container end event could not be published for " 1265 + container.getContainerId().toString(), e); 1266 } 1267 } 1268 1269 private TimelinePutResponse putContainerEntity( 1270 TimelineClient timelineClient, ApplicationAttemptId currAttemptId, 1271 TimelineEntity entity) 1272 throws YarnException, IOException { 1273 if (TimelineUtils.timelineServiceV1_5Enabled(conf)) { 1274 TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( 1275 currAttemptId.getApplicationId(), 1276 CONTAINER_ENTITY_GROUP_ID); 1277 return timelineClient.putEntities(currAttemptId, groupId, entity); 1278 } else { 1279 return timelineClient.putEntities(entity); 1280 } 1281 } 1282 1283 private void publishApplicationAttemptEvent( 1284 final TimelineClient timelineClient, String appAttemptId, 1285 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1286 final TimelineEntity entity = new TimelineEntity(); 1287 entity.setEntityId(appAttemptId); 1288 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1289 entity.setDomainId(domainId); 1290 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1291 TimelineEvent event = new TimelineEvent(); 1292 event.setEventType(appEvent.toString()); 1293 event.setTimestamp(System.currentTimeMillis()); 1294 entity.addEvent(event); 1295 try { 1296 TimelinePutResponse response = timelineClient.putEntities(entity); 1297 processTimelineResponseErrors(response); 1298 } catch (YarnException | IOException | ClientHandlerException e) { 1299 LOG.error("App Attempt " 1300 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1301 + " event could not be published for " 1302 + appAttemptID, e); 1303 } 1304 } 1305 1306 private TimelinePutResponse processTimelineResponseErrors( 1307 TimelinePutResponse response) { 1308 List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); 1309 if (errors.size() == 0) { 1310 LOG.debug("Timeline entities are successfully put"); 1311 } else { 1312 for (TimelinePutResponse.TimelinePutError error : errors) { 1313 LOG.error( 1314 "Error when publishing entity [" + error.getEntityType() + "," 1315 + error.getEntityId() + "], server side error code: " 1316 + error.getErrorCode()); 1317 } 1318 } 1319 return response; 1320 } 1321 1322 RMCallbackHandler getRMCallbackHandler() { 1323 return new RMCallbackHandler(); 1324 } 1325 1326 @VisibleForTesting 1327 void setAmRMClient(AMRMClientAsync client) { 1328 this.amRMClient = client; 1329 } 1330 1331 @VisibleForTesting 1332 int getNumCompletedContainers() { 1333 return numCompletedContainers.get(); 1334 } 1335 1336 @VisibleForTesting 1337 boolean getDone() { 1338 return done; 1339 } 1340 1341 @VisibleForTesting 1342 Thread createLaunchContainerThread(Container allocatedContainer, 1343 String shellId) { 1344 LaunchContainerRunnable runnableLaunchContainer = 1345 new LaunchContainerRunnable(allocatedContainer, containerListener, 1346 shellId); 1347 return new Thread(runnableLaunchContainer); 1348 } 1349 1350 private void publishContainerStartEventOnTimelineServiceV2( 1351 Container container) { 1352 final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 1353 entity = 1354 new org.apache.hadoop.yarn.api.records.timelineservice. 1355 TimelineEntity(); 1356 entity.setId(container.getId().toString()); 1357 entity.setType(DSEntity.DS_CONTAINER.toString()); 1358 long ts = System.currentTimeMillis(); 1359 entity.setCreatedTime(ts); 1360 entity.addInfo("user", appSubmitterUgi.getShortUserName()); 1361 1362 org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = 1363 new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); 1364 event.setTimestamp(ts); 1365 event.setId(DSEvent.DS_CONTAINER_START.toString()); 1366 event.addInfo("Node", container.getNodeId().toString()); 1367 event.addInfo("Resources", container.getResource().toString()); 1368 entity.addEvent(event); 1369 1370 try { 1371 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { 1372 @Override 1373 public TimelinePutResponse run() throws Exception { 1374 timelineClient.putEntities(entity); 1375 return null; 1376 } 1377 }); 1378 } catch (Exception e) { 1379 LOG.error("Container start event could not be published for " 1380 + container.getId().toString(), 1381 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1382 } 1383 } 1384 1385 private void publishContainerEndEventOnTimelineServiceV2( 1386 final ContainerStatus container) { 1387 final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 1388 entity = 1389 new org.apache.hadoop.yarn.api.records.timelineservice. 1390 TimelineEntity(); 1391 entity.setId(container.getContainerId().toString()); 1392 entity.setType(DSEntity.DS_CONTAINER.toString()); 1393 //entity.setDomainId(domainId); 1394 entity.addInfo("user", appSubmitterUgi.getShortUserName()); 1395 org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = 1396 new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); 1397 event.setTimestamp(System.currentTimeMillis()); 1398 event.setId(DSEvent.DS_CONTAINER_END.toString()); 1399 event.addInfo("State", container.getState().name()); 1400 event.addInfo("Exit Status", container.getExitStatus()); 1401 entity.addEvent(event); 1402 1403 try { 1404 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { 1405 @Override 1406 public TimelinePutResponse run() throws Exception { 1407 timelineClient.putEntities(entity); 1408 return null; 1409 } 1410 }); 1411 } catch (Exception e) { 1412 LOG.error("Container end event could not be published for " 1413 + container.getContainerId().toString(), 1414 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1415 } 1416 } 1417 1418 private void publishApplicationAttemptEventOnTimelineServiceV2( 1419 DSEvent appEvent) { 1420 final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 1421 entity = 1422 new org.apache.hadoop.yarn.api.records.timelineservice. 1423 TimelineEntity(); 1424 entity.setId(appAttemptID.toString()); 1425 entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); 1426 long ts = System.currentTimeMillis(); 1427 if (appEvent == DSEvent.DS_APP_ATTEMPT_START) { 1428 entity.setCreatedTime(ts); 1429 } 1430 entity.addInfo("user", appSubmitterUgi.getShortUserName()); 1431 org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = 1432 new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); 1433 event.setId(appEvent.toString()); 1434 event.setTimestamp(ts); 1435 entity.addEvent(event); 1436 1437 try { 1438 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { 1439 @Override 1440 public TimelinePutResponse run() throws Exception { 1441 timelineClient.putEntitiesAsync(entity); 1442 return null; 1443 } 1444 }); 1445 } catch (Exception e) { 1446 LOG.error("App Attempt " 1447 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1448 + " event could not be published for " 1449 + appAttemptID, 1450 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1451 } 1452 } 1453 1454}