001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.Vector; 030 031import org.apache.commons.cli.CommandLine; 032import org.apache.commons.cli.GnuParser; 033import org.apache.commons.cli.HelpFormatter; 034import org.apache.commons.cli.Option; 035import org.apache.commons.cli.Options; 036import org.apache.commons.cli.ParseException; 037import org.apache.commons.io.IOUtils; 038import org.apache.commons.lang.StringUtils; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.apache.hadoop.classification.InterfaceAudience; 042import org.apache.hadoop.classification.InterfaceStability; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataOutputStream; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.fs.permission.FsPermission; 049import org.apache.hadoop.io.DataOutputBuffer; 050import org.apache.hadoop.security.Credentials; 051import org.apache.hadoop.security.UserGroupInformation; 052import org.apache.hadoop.security.token.Token; 053import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 054import org.apache.hadoop.yarn.api.ApplicationConstants; 055import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 056import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 057import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 058import org.apache.hadoop.yarn.api.records.ApplicationId; 059import org.apache.hadoop.yarn.api.records.ApplicationReport; 060import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 061import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 062import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 063import org.apache.hadoop.yarn.api.records.LocalResource; 064import org.apache.hadoop.yarn.api.records.LocalResourceType; 065import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 066import org.apache.hadoop.yarn.api.records.NodeReport; 067import org.apache.hadoop.yarn.api.records.NodeState; 068import org.apache.hadoop.yarn.api.records.Priority; 069import org.apache.hadoop.yarn.api.records.QueueACL; 070import org.apache.hadoop.yarn.api.records.QueueInfo; 071import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 072import org.apache.hadoop.yarn.api.records.Resource; 073import org.apache.hadoop.yarn.api.records.URL; 074import org.apache.hadoop.yarn.api.records.YarnApplicationState; 075import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 076import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; 077import org.apache.hadoop.yarn.client.api.TimelineClient; 078import org.apache.hadoop.yarn.client.api.YarnClient; 079import org.apache.hadoop.yarn.client.api.YarnClientApplication; 080import org.apache.hadoop.yarn.client.util.YarnClientUtils; 081import org.apache.hadoop.yarn.conf.YarnConfiguration; 082import org.apache.hadoop.yarn.exceptions.YarnException; 083import org.apache.hadoop.yarn.util.ConverterUtils; 084import org.apache.hadoop.yarn.util.timeline.TimelineUtils; 085 086/** 087 * Client for Distributed Shell application submission to YARN. 088 * 089 * <p> The distributed shell client allows an application master to be launched that in turn would run 090 * the provided shell command on a set of containers. </p> 091 * 092 * <p>This client is meant to act as an example on how to write yarn-based applications. </p> 093 * 094 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 095 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 096 * provides a way for the client to get access to cluster information and to request for a 097 * new {@link ApplicationId}. <p> 098 * 099 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 100 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 101 * and application name, the priority assigned to the application and the queue 102 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} 103 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 104 * the {@link ApplicationMaster} is launched. </p> 105 * 106 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 107 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 108 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 109 * {@link ApplicationMaster}. <p> 110 * 111 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 112 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 113 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 114 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p> 115 * 116 */ 117@InterfaceAudience.Public 118@InterfaceStability.Unstable 119public class Client { 120 121 private static final Log LOG = LogFactory.getLog(Client.class); 122 123 // Configuration 124 private Configuration conf; 125 private YarnClient yarnClient; 126 // Application master specific info to register a new Application with RM/ASM 127 private String appName = ""; 128 // App master priority 129 private int amPriority = 0; 130 // Queue for App master 131 private String amQueue = ""; 132 // Amt. of memory resource to request for to run the App Master 133 private long amMemory = 100; 134 // Amt. of virtual core resource to request for to run the App Master 135 private int amVCores = 1; 136 137 // Application master jar file 138 private String appMasterJar = ""; 139 // Main class to invoke application master 140 private final String appMasterMainClass; 141 142 // Shell command to be executed 143 private String shellCommand = ""; 144 // Location of shell script 145 private String shellScriptPath = ""; 146 // Args to be passed to the shell command 147 private String[] shellArgs = new String[] {}; 148 // Env variables to be setup for the shell command 149 private Map<String, String> shellEnv = new HashMap<String, String>(); 150 // Shell Command Container priority 151 private int shellCmdPriority = 0; 152 153 // Amt of memory to request for container in which shell script will be executed 154 private int containerMemory = 10; 155 // Amt. of virtual cores to request for container in which shell script will be executed 156 private int containerVirtualCores = 1; 157 // No. of containers in which the shell script needs to be executed 158 private int numContainers = 1; 159 private String nodeLabelExpression = null; 160 161 // log4j.properties file 162 // if available, add to local resources and set into classpath 163 private String log4jPropFile = ""; 164 165 // Start time for client 166 private final long clientStartTime = System.currentTimeMillis(); 167 // Timeout threshold for client. Kill app after time interval expires. 168 private long clientTimeout = 600000; 169 170 // flag to indicate whether to keep containers across application attempts. 171 private boolean keepContainers = false; 172 173 private long attemptFailuresValidityInterval = -1; 174 175 private Vector<CharSequence> containerRetryOptions = new Vector<>(5); 176 177 // Debug flag 178 boolean debugFlag = false; 179 180 // Timeline domain ID 181 private String domainId = null; 182 183 // Flag to indicate whether to create the domain of the given ID 184 private boolean toCreateDomain = false; 185 186 // Timeline domain reader access control 187 private String viewACLs = null; 188 189 // Timeline domain writer access control 190 private String modifyACLs = null; 191 192 private String flowName = null; 193 private String flowVersion = null; 194 private long flowRunId = 0L; 195 196 // Command line options 197 private Options opts; 198 199 private static final String shellCommandPath = "shellCommands"; 200 private static final String shellArgsPath = "shellArgs"; 201 private static final String appMasterJarPath = "AppMaster.jar"; 202 // Hardcoded path to custom log_properties 203 private static final String log4jPath = "log4j.properties"; 204 205 public static final String SCRIPT_PATH = "ExecScript"; 206 207 /** 208 * @param args Command line arguments 209 */ 210 public static void main(String[] args) { 211 boolean result = false; 212 try { 213 Client client = new Client(); 214 LOG.info("Initializing Client"); 215 try { 216 boolean doRun = client.init(args); 217 if (!doRun) { 218 System.exit(0); 219 } 220 } catch (IllegalArgumentException e) { 221 System.err.println(e.getLocalizedMessage()); 222 client.printUsage(); 223 System.exit(-1); 224 } 225 result = client.run(); 226 } catch (Throwable t) { 227 LOG.fatal("Error running Client", t); 228 System.exit(1); 229 } 230 if (result) { 231 LOG.info("Application completed successfully"); 232 System.exit(0); 233 } 234 LOG.error("Application failed to complete successfully"); 235 System.exit(2); 236 } 237 238 /** 239 */ 240 public Client(Configuration conf) throws Exception { 241 this( 242 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", 243 conf); 244 } 245 246 Client(String appMasterMainClass, Configuration conf) { 247 this.conf = conf; 248 this.appMasterMainClass = appMasterMainClass; 249 yarnClient = YarnClient.createYarnClient(); 250 yarnClient.init(conf); 251 opts = new Options(); 252 opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); 253 opts.addOption("priority", true, "Application Priority. Default 0"); 254 opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); 255 opts.addOption("timeout", true, "Application timeout in milliseconds"); 256 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); 257 opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); 258 opts.addOption("jar", true, "Jar file containing the application master"); 259 opts.addOption("shell_command", true, "Shell command to be executed by " + 260 "the Application Master. Can only specify either --shell_command " + 261 "or --shell_script"); 262 opts.addOption("shell_script", true, "Location of the shell script to be " + 263 "executed. Can only specify either --shell_command or --shell_script"); 264 opts.addOption("shell_args", true, "Command line args for the shell script." + 265 "Multiple args can be separated by empty space."); 266 opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); 267 opts.addOption("shell_env", true, 268 "Environment for shell script. Specified as env_key=env_val pairs"); 269 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); 270 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 271 opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); 272 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 273 opts.addOption("log_properties", true, "log4j.properties file"); 274 opts.addOption("keep_containers_across_application_attempts", false, 275 "Flag to indicate whether to keep containers across application attempts." + 276 " If the flag is true, running containers will not be killed when" + 277 " application attempt fails and these containers will be retrieved by" + 278 " the new application attempt "); 279 opts.addOption("attempt_failures_validity_interval", true, 280 "when attempt_failures_validity_interval in milliseconds is set to > 0," + 281 "the failure number will not take failures which happen out of " + 282 "the validityInterval into failure count. " + 283 "If failure count reaches to maxAppAttempts, " + 284 "the application will be failed."); 285 opts.addOption("debug", false, "Dump out debug information"); 286 opts.addOption("domain", true, "ID of the timeline domain where the " 287 + "timeline entities will be put"); 288 opts.addOption("view_acls", true, "Users and groups that allowed to " 289 + "view the timeline entities in the given domain"); 290 opts.addOption("modify_acls", true, "Users and groups that allowed to " 291 + "modify the timeline entities in the given domain"); 292 opts.addOption("create", false, "Flag to indicate whether to create the " 293 + "domain specified with -domain."); 294 opts.addOption("flow_name", true, "Flow name which the distributed shell " 295 + "app belongs to"); 296 opts.addOption("flow_version", true, "Flow version which the distributed " 297 + "shell app belongs to"); 298 opts.addOption("flow_run_id", true, "Flow run ID which the distributed " 299 + "shell app belongs to"); 300 opts.addOption("help", false, "Print usage"); 301 opts.addOption("node_label_expression", true, 302 "Node label expression to determine the nodes" 303 + " where all the containers of this application" 304 + " will be allocated, \"\" means containers" 305 + " can be allocated anywhere, if you don't specify the option," 306 + " default node_label_expression of queue will be used."); 307 opts.addOption("container_retry_policy", true, 308 "Retry policy when container fails to run, " 309 + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " 310 + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); 311 opts.addOption("container_retry_error_codes", true, 312 "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " 313 + "codes is specified with this option, " 314 + "e.g. --container_retry_error_codes 1,2,3"); 315 opts.addOption("container_max_retries", true, 316 "If container could retry, it specifies max retires"); 317 opts.addOption("container_retry_interval", true, 318 "Interval between each retry, unit is milliseconds"); 319 } 320 321 /** 322 */ 323 public Client() throws Exception { 324 this(new YarnConfiguration()); 325 } 326 327 /** 328 * Helper function to print out usage 329 */ 330 private void printUsage() { 331 new HelpFormatter().printHelp("Client", opts); 332 } 333 334 /** 335 * Parse command line options 336 * @param args Parsed command line options 337 * @return Whether the init was successful to run the client 338 * @throws ParseException 339 */ 340 public boolean init(String[] args) throws ParseException { 341 342 CommandLine cliParser = new GnuParser().parse(opts, args); 343 344 if (args.length == 0) { 345 throw new IllegalArgumentException("No args specified for client to initialize"); 346 } 347 348 if (cliParser.hasOption("log_properties")) { 349 String log4jPath = cliParser.getOptionValue("log_properties"); 350 try { 351 Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath); 352 } catch (Exception e) { 353 LOG.warn("Can not set up custom log4j properties. " + e); 354 } 355 } 356 357 if (cliParser.hasOption("help")) { 358 printUsage(); 359 return false; 360 } 361 362 if (cliParser.hasOption("debug")) { 363 debugFlag = true; 364 365 } 366 367 if (cliParser.hasOption("keep_containers_across_application_attempts")) { 368 LOG.info("keep_containers_across_application_attempts"); 369 keepContainers = true; 370 } 371 372 appName = cliParser.getOptionValue("appname", "DistributedShell"); 373 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 374 amQueue = cliParser.getOptionValue("queue", "default"); 375 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100")); 376 amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); 377 378 if (amMemory < 0) { 379 throw new IllegalArgumentException("Invalid memory specified for application master, exiting." 380 + " Specified memory=" + amMemory); 381 } 382 if (amVCores < 0) { 383 throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." 384 + " Specified virtual cores=" + amVCores); 385 } 386 387 if (!cliParser.hasOption("jar")) { 388 throw new IllegalArgumentException("No jar file specified for application master"); 389 } 390 391 appMasterJar = cliParser.getOptionValue("jar"); 392 393 if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) { 394 throw new IllegalArgumentException( 395 "No shell command or shell script specified to be executed by application master"); 396 } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) { 397 throw new IllegalArgumentException("Can not specify shell_command option " + 398 "and shell_script option at the same time"); 399 } else if (cliParser.hasOption("shell_command")) { 400 shellCommand = cliParser.getOptionValue("shell_command"); 401 } else { 402 shellScriptPath = cliParser.getOptionValue("shell_script"); 403 } 404 if (cliParser.hasOption("shell_args")) { 405 shellArgs = cliParser.getOptionValues("shell_args"); 406 } 407 if (cliParser.hasOption("shell_env")) { 408 String envs[] = cliParser.getOptionValues("shell_env"); 409 for (String env : envs) { 410 env = env.trim(); 411 int index = env.indexOf('='); 412 if (index == -1) { 413 shellEnv.put(env, ""); 414 continue; 415 } 416 String key = env.substring(0, index); 417 String val = ""; 418 if (index < (env.length()-1)) { 419 val = env.substring(index+1); 420 } 421 shellEnv.put(key, val); 422 } 423 } 424 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); 425 426 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 427 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); 428 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 429 430 431 if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { 432 throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," 433 + " exiting." 434 + " Specified containerMemory=" + containerMemory 435 + ", containerVirtualCores=" + containerVirtualCores 436 + ", numContainer=" + numContainers); 437 } 438 439 nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); 440 441 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); 442 443 attemptFailuresValidityInterval = 444 Long.parseLong(cliParser.getOptionValue( 445 "attempt_failures_validity_interval", "-1")); 446 447 log4jPropFile = cliParser.getOptionValue("log_properties", ""); 448 449 // Get timeline domain options 450 if (cliParser.hasOption("domain")) { 451 domainId = cliParser.getOptionValue("domain"); 452 toCreateDomain = cliParser.hasOption("create"); 453 if (cliParser.hasOption("view_acls")) { 454 viewACLs = cliParser.getOptionValue("view_acls"); 455 } 456 if (cliParser.hasOption("modify_acls")) { 457 modifyACLs = cliParser.getOptionValue("modify_acls"); 458 } 459 } 460 461 // Get container retry options 462 if (cliParser.hasOption("container_retry_policy")) { 463 containerRetryOptions.add("--container_retry_policy " 464 + cliParser.getOptionValue("container_retry_policy")); 465 } 466 if (cliParser.hasOption("container_retry_error_codes")) { 467 containerRetryOptions.add("--container_retry_error_codes " 468 + cliParser.getOptionValue("container_retry_error_codes")); 469 } 470 if (cliParser.hasOption("container_max_retries")) { 471 containerRetryOptions.add("--container_max_retries " 472 + cliParser.getOptionValue("container_max_retries")); 473 } 474 if (cliParser.hasOption("container_retry_interval")) { 475 containerRetryOptions.add("--container_retry_interval " 476 + cliParser.getOptionValue("container_retry_interval")); 477 } 478 479 if (cliParser.hasOption("flow_name")) { 480 flowName = cliParser.getOptionValue("flow_name"); 481 } 482 if (cliParser.hasOption("flow_version")) { 483 flowVersion = cliParser.getOptionValue("flow_version"); 484 } 485 if (cliParser.hasOption("flow_run_id")) { 486 try { 487 flowRunId = Long.parseLong(cliParser.getOptionValue("flow_run_id")); 488 } catch (NumberFormatException e) { 489 throw new IllegalArgumentException( 490 "Flow run is not a valid long value", e); 491 } 492 } 493 return true; 494 } 495 496 /** 497 * Main run function for the client 498 * @return true if application completed successfully 499 * @throws IOException 500 * @throws YarnException 501 */ 502 public boolean run() throws IOException, YarnException { 503 504 LOG.info("Running Client"); 505 yarnClient.start(); 506 507 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); 508 LOG.info("Got Cluster metric info from ASM" 509 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); 510 511 List<NodeReport> clusterNodeReports = yarnClient.getNodeReports( 512 NodeState.RUNNING); 513 LOG.info("Got Cluster node info from ASM"); 514 for (NodeReport node : clusterNodeReports) { 515 LOG.info("Got node report from ASM for" 516 + ", nodeId=" + node.getNodeId() 517 + ", nodeAddress=" + node.getHttpAddress() 518 + ", nodeRackName=" + node.getRackName() 519 + ", nodeNumContainers=" + node.getNumContainers()); 520 } 521 522 QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue); 523 LOG.info("Queue info" 524 + ", queueName=" + queueInfo.getQueueName() 525 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() 526 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() 527 + ", queueApplicationCount=" + queueInfo.getApplications().size() 528 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); 529 530 List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo(); 531 for (QueueUserACLInfo aclInfo : listAclInfo) { 532 for (QueueACL userAcl : aclInfo.getUserAcls()) { 533 LOG.info("User ACL Info for Queue" 534 + ", queueName=" + aclInfo.getQueueName() 535 + ", userAcl=" + userAcl.name()); 536 } 537 } 538 539 if (domainId != null && domainId.length() > 0 && toCreateDomain) { 540 prepareTimelineDomain(); 541 } 542 543 // Get a new application id 544 YarnClientApplication app = yarnClient.createApplication(); 545 GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); 546 // TODO get min/max resource capabilities from RM and change memory ask if needed 547 // If we do not have min/max, we may not be able to correctly request 548 // the required resources from the RM for the app master 549 // Memory ask has to be a multiple of min and less than max. 550 // Dump out information about cluster capability as seen by the resource manager 551 long maxMem = appResponse.getMaximumResourceCapability().getMemorySize(); 552 LOG.info("Max mem capability of resources in this cluster " + maxMem); 553 554 // A resource ask cannot exceed the max. 555 if (amMemory > maxMem) { 556 LOG.info("AM memory specified above max threshold of cluster. Using max value." 557 + ", specified=" + amMemory 558 + ", max=" + maxMem); 559 amMemory = maxMem; 560 } 561 562 int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); 563 LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores); 564 565 if (amVCores > maxVCores) { 566 LOG.info("AM virtual cores specified above max threshold of cluster. " 567 + "Using max value." + ", specified=" + amVCores 568 + ", max=" + maxVCores); 569 amVCores = maxVCores; 570 } 571 572 // set the application name 573 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); 574 ApplicationId appId = appContext.getApplicationId(); 575 576 appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); 577 appContext.setApplicationName(appName); 578 579 if (attemptFailuresValidityInterval >= 0) { 580 appContext 581 .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); 582 } 583 584 Set<String> tags = new HashSet<String>(); 585 if (flowName != null) { 586 tags.add(TimelineUtils.generateFlowNameTag(flowName)); 587 } 588 if (flowVersion != null) { 589 tags.add(TimelineUtils.generateFlowVersionTag(flowVersion)); 590 } 591 if (flowRunId != 0) { 592 tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId)); 593 } 594 appContext.setApplicationTags(tags); 595 596 // set local resources for the application master 597 // local files or archives as needed 598 // In this scenario, the jar file for the application master is part of the local resources 599 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 600 601 LOG.info("Copy App Master jar from local filesystem and add to local environment"); 602 // Copy the application master jar to the filesystem 603 // Create a local resource to point to the destination jar path 604 FileSystem fs = FileSystem.get(conf); 605 addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), 606 localResources, null); 607 608 // Set the log4j properties if needed 609 if (!log4jPropFile.isEmpty()) { 610 addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), 611 localResources, null); 612 } 613 614 // The shell script has to be made available on the final container(s) 615 // where it will be executed. 616 // To do this, we need to first copy into the filesystem that is visible 617 // to the yarn framework. 618 // We do not need to set this as a local resource for the application 619 // master as the application master does not need it. 620 String hdfsShellScriptLocation = ""; 621 long hdfsShellScriptLen = 0; 622 long hdfsShellScriptTimestamp = 0; 623 if (!shellScriptPath.isEmpty()) { 624 Path shellSrc = new Path(shellScriptPath); 625 String shellPathSuffix = 626 appName + "/" + appId.toString() + "/" + SCRIPT_PATH; 627 Path shellDst = 628 new Path(fs.getHomeDirectory(), shellPathSuffix); 629 fs.copyFromLocalFile(false, true, shellSrc, shellDst); 630 hdfsShellScriptLocation = shellDst.toUri().toString(); 631 FileStatus shellFileStatus = fs.getFileStatus(shellDst); 632 hdfsShellScriptLen = shellFileStatus.getLen(); 633 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); 634 } 635 636 if (!shellCommand.isEmpty()) { 637 addToLocalResources(fs, null, shellCommandPath, appId.toString(), 638 localResources, shellCommand); 639 } 640 641 if (shellArgs.length > 0) { 642 addToLocalResources(fs, null, shellArgsPath, appId.toString(), 643 localResources, StringUtils.join(shellArgs, " ")); 644 } 645 646 // Set the necessary security tokens as needed 647 //amContainer.setContainerTokens(containerToken); 648 649 // Set the env variables to be setup in the env where the application master will be run 650 LOG.info("Set the environment for the application master"); 651 Map<String, String> env = new HashMap<String, String>(); 652 653 // put location of shell script into env 654 // using the env info, the application master will create the correct local resource for the 655 // eventual containers that will be launched to execute the shell scripts 656 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); 657 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); 658 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); 659 if (domainId != null && domainId.length() > 0) { 660 env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); 661 } 662 663 // Add AppMaster.jar location to classpath 664 // At some point we should not be required to add 665 // the hadoop specific classpaths to the env. 666 // It should be provided out of the box. 667 // For now setting all required classpaths including 668 // the classpath to "." for the application jar 669 StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) 670 .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); 671 for (String c : conf.getStrings( 672 YarnConfiguration.YARN_APPLICATION_CLASSPATH, 673 YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { 674 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); 675 classPathEnv.append(c.trim()); 676 } 677 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( 678 "./log4j.properties"); 679 680 // add the runtime classpath needed for tests to work 681 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { 682 classPathEnv.append(':'); 683 classPathEnv.append(System.getProperty("java.class.path")); 684 } 685 686 env.put("CLASSPATH", classPathEnv.toString()); 687 688 // Set the necessary command to execute the application master 689 Vector<CharSequence> vargs = new Vector<CharSequence>(30); 690 691 // Set java executable command 692 LOG.info("Setting up app master command"); 693 vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); 694 // Set Xmx based on am memory size 695 vargs.add("-Xmx" + amMemory + "m"); 696 // Set class name 697 vargs.add(appMasterMainClass); 698 // Set params for Application Master 699 vargs.add("--container_memory " + String.valueOf(containerMemory)); 700 vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); 701 vargs.add("--num_containers " + String.valueOf(numContainers)); 702 if (null != nodeLabelExpression) { 703 appContext.setNodeLabelExpression(nodeLabelExpression); 704 } 705 vargs.add("--priority " + String.valueOf(shellCmdPriority)); 706 707 for (Map.Entry<String, String> entry : shellEnv.entrySet()) { 708 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); 709 } 710 if (debugFlag) { 711 vargs.add("--debug"); 712 } 713 714 vargs.addAll(containerRetryOptions); 715 716 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); 717 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); 718 719 // Get final commmand 720 StringBuilder command = new StringBuilder(); 721 for (CharSequence str : vargs) { 722 command.append(str).append(" "); 723 } 724 725 LOG.info("Completed setting up app master command " + command.toString()); 726 List<String> commands = new ArrayList<String>(); 727 commands.add(command.toString()); 728 729 // Set up the container launch context for the application master 730 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( 731 localResources, env, commands, null, null, null); 732 733 // Set up resource type requirements 734 // For now, both memory and vcores are supported, so we set memory and 735 // vcores requirements 736 Resource capability = Resource.newInstance(amMemory, amVCores); 737 appContext.setResource(capability); 738 739 // Service data is a binary blob that can be passed to the application 740 // Not needed in this scenario 741 // amContainer.setServiceData(serviceData); 742 743 // Setup security tokens 744 if (UserGroupInformation.isSecurityEnabled()) { 745 // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce 746 Credentials credentials = new Credentials(); 747 String tokenRenewer = YarnClientUtils.getRmPrincipal(conf); 748 if (tokenRenewer == null || tokenRenewer.length() == 0) { 749 throw new IOException( 750 "Can't get Master Kerberos principal for the RM to use as renewer"); 751 } 752 753 // For now, only getting tokens for the default file-system. 754 final Token<?> tokens[] = 755 fs.addDelegationTokens(tokenRenewer, credentials); 756 if (tokens != null) { 757 for (Token<?> token : tokens) { 758 LOG.info("Got dt for " + fs.getUri() + "; " + token); 759 } 760 } 761 DataOutputBuffer dob = new DataOutputBuffer(); 762 credentials.writeTokenStorageToStream(dob); 763 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 764 amContainer.setTokens(fsTokens); 765 } 766 767 appContext.setAMContainerSpec(amContainer); 768 769 // Set the priority for the application master 770 // TODO - what is the range for priority? how to decide? 771 Priority pri = Priority.newInstance(amPriority); 772 appContext.setPriority(pri); 773 774 // Set the queue to which this application is to be submitted in the RM 775 appContext.setQueue(amQueue); 776 777 // Submit the application to the applications manager 778 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); 779 // Ignore the response as either a valid response object is returned on success 780 // or an exception thrown to denote some form of a failure 781 LOG.info("Submitting application to ASM"); 782 783 yarnClient.submitApplication(appContext); 784 785 // TODO 786 // Try submitting the same request again 787 // app submission failure? 788 789 // Monitor the application 790 return monitorApplication(appId); 791 792 } 793 794 /** 795 * Monitor the submitted application for completion. 796 * Kill application if time expires. 797 * @param appId Application Id of application to be monitored 798 * @return true if application completed successfully 799 * @throws YarnException 800 * @throws IOException 801 */ 802 private boolean monitorApplication(ApplicationId appId) 803 throws YarnException, IOException { 804 805 while (true) { 806 807 // Check app status every 1 second. 808 try { 809 Thread.sleep(1000); 810 } catch (InterruptedException e) { 811 LOG.debug("Thread sleep in monitoring loop interrupted"); 812 } 813 814 // Get application report for the appId we are interested in 815 ApplicationReport report = yarnClient.getApplicationReport(appId); 816 817 LOG.info("Got application report from ASM for" 818 + ", appId=" + appId.getId() 819 + ", clientToAMToken=" + report.getClientToAMToken() 820 + ", appDiagnostics=" + report.getDiagnostics() 821 + ", appMasterHost=" + report.getHost() 822 + ", appQueue=" + report.getQueue() 823 + ", appMasterRpcPort=" + report.getRpcPort() 824 + ", appStartTime=" + report.getStartTime() 825 + ", yarnAppState=" + report.getYarnApplicationState().toString() 826 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() 827 + ", appTrackingUrl=" + report.getTrackingUrl() 828 + ", appUser=" + report.getUser()); 829 830 YarnApplicationState state = report.getYarnApplicationState(); 831 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); 832 if (YarnApplicationState.FINISHED == state) { 833 if (FinalApplicationStatus.SUCCEEDED == dsStatus) { 834 LOG.info("Application has completed successfully. Breaking monitoring loop"); 835 return true; 836 } 837 else { 838 LOG.info("Application did finished unsuccessfully." 839 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 840 + ". Breaking monitoring loop"); 841 return false; 842 } 843 } 844 else if (YarnApplicationState.KILLED == state 845 || YarnApplicationState.FAILED == state) { 846 LOG.info("Application did not finish." 847 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 848 + ". Breaking monitoring loop"); 849 return false; 850 } 851 852 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { 853 LOG.info("Reached client specified timeout for application. Killing application"); 854 forceKillApplication(appId); 855 return false; 856 } 857 } 858 859 } 860 861 /** 862 * Kill a submitted application by sending a call to the ASM 863 * @param appId Application Id to be killed. 864 * @throws YarnException 865 * @throws IOException 866 */ 867 private void forceKillApplication(ApplicationId appId) 868 throws YarnException, IOException { 869 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 870 // the same time. 871 // If yes, can we kill a particular attempt only? 872 873 // Response can be ignored as it is non-null on success or 874 // throws an exception in case of failures 875 yarnClient.killApplication(appId); 876 } 877 878 private void addToLocalResources(FileSystem fs, String fileSrcPath, 879 String fileDstPath, String appId, Map<String, LocalResource> localResources, 880 String resources) throws IOException { 881 String suffix = 882 appName + "/" + appId + "/" + fileDstPath; 883 Path dst = 884 new Path(fs.getHomeDirectory(), suffix); 885 if (fileSrcPath == null) { 886 FSDataOutputStream ostream = null; 887 try { 888 ostream = FileSystem 889 .create(fs, dst, new FsPermission((short) 0710)); 890 ostream.writeUTF(resources); 891 } finally { 892 IOUtils.closeQuietly(ostream); 893 } 894 } else { 895 fs.copyFromLocalFile(new Path(fileSrcPath), dst); 896 } 897 FileStatus scFileStatus = fs.getFileStatus(dst); 898 LocalResource scRsrc = 899 LocalResource.newInstance( 900 URL.fromURI(dst.toUri()), 901 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 902 scFileStatus.getLen(), scFileStatus.getModificationTime()); 903 localResources.put(fileDstPath, scRsrc); 904 } 905 906 private void prepareTimelineDomain() { 907 TimelineClient timelineClient = null; 908 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 909 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 910 timelineClient = TimelineClient.createTimelineClient(); 911 timelineClient.init(conf); 912 timelineClient.start(); 913 } else { 914 LOG.warn("Cannot put the domain " + domainId + 915 " because the timeline service is not enabled"); 916 return; 917 } 918 try { 919 //TODO: we need to check and combine the existing timeline domain ACLs, 920 //but let's do it once we have client java library to query domains. 921 TimelineDomain domain = new TimelineDomain(); 922 domain.setId(domainId); 923 domain.setReaders( 924 viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); 925 domain.setWriters( 926 modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); 927 timelineClient.putDomain(domain); 928 LOG.info("Put the timeline domain: " + 929 TimelineUtils.dumpTimelineRecordtoJSON(domain)); 930 } catch (Exception e) { 931 LOG.error("Error when putting the timeline domain", e); 932 } finally { 933 timelineClient.stop(); 934 } 935 } 936}